From 97fd5878d9ed014a4d4a28ac52c34d48c40cf1b8 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Tue, 20 Sep 2022 10:55:23 -0700 Subject: [PATCH] change DDTeamCollection constructor --- fdbserver/DDTeamCollection.actor.cpp | 23 +++++++++++-------- fdbserver/DataDistribution.actor.cpp | 4 ++-- .../include/fdbserver/DDTeamCollection.h | 3 ++- fdbserver/include/fdbserver/DDTxnProcessor.h | 5 ++++ 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 5507adeed8..1a08c9f097 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -3581,7 +3581,7 @@ bool DDTeamCollection::satisfiesPolicy(const std::vector return result && resultEntries.size() == 0; } -DDTeamCollection::DDTeamCollection(Database const& cx, +DDTeamCollection::DDTeamCollection(const std::shared_ptr& dbProcessor, UID distributorId, MoveKeysLock const& lock, PromiseStream const& output, @@ -3597,9 +3597,9 @@ DDTeamCollection::DDTeamCollection(Database const& cx, PromiseStream getShardMetrics, Promise removeFailedServer, PromiseStream> getUnhealthyRelocationCount) - : doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()), lock(lock), output(output), - unhealthyServers(0), storageWiggler(makeReference(this)), processingWiggle(processingWiggle), - shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), + : dbProcessor(dbProcessor), doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()), + lock(lock), output(output), unhealthyServers(0), storageWiggler(makeReference(this)), + processingWiggle(processingWiggle), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), initialFailureReactionDelay( delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)), initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay)), recruitingStream(0), @@ -3615,8 +3615,13 @@ DDTeamCollection::DDTeamCollection(Database const& cx, teamCollectionInfoEventHolder(makeReference("TeamCollectionInfo")), storageServerRecruitmentEventHolder( makeReference("StorageServerRecruitment_" + distributorId.toString())), - primary(primary), distributorId(distributorId), cx(cx), configuration(configuration), + primary(primary), distributorId(distributorId), configuration(configuration), storageServerSet(new LocalityMap()) { + + if (!dbProcessor->isMocked()) { + cx = this->dbProcessor->getDb(); + } + if (!primary || configuration.usableRegions == 1) { TraceEvent("DDTrackerStarting", distributorId) .detail("State", "Inactive") @@ -5147,13 +5152,13 @@ public: int processCount) { Database database = DatabaseContext::create( makeReference>(), Never(), LocalityData(), EnableLocalityLoadBalance::False); - + auto txnProcessor = std::shared_ptr(new DDTxnProcessor(database)); DatabaseConfiguration conf; conf.storageTeamSize = teamSize; conf.storagePolicy = policy; auto collection = - std::unique_ptr(new DDTeamCollection(database, + std::unique_ptr(new DDTeamCollection(txnProcessor, UID(0, 0), MoveKeysLock(), PromiseStream(), @@ -5191,13 +5196,13 @@ public: int processCount) { Database database = DatabaseContext::create( makeReference>(), Never(), LocalityData(), EnableLocalityLoadBalance::False); - + auto txnProcessor = std::shared_ptr(new DDTxnProcessor(database)); DatabaseConfiguration conf; conf.storageTeamSize = teamSize; conf.storagePolicy = policy; auto collection = - std::unique_ptr(new DDTeamCollection(database, + std::unique_ptr(new DDTeamCollection(txnProcessor, UID(0, 0), MoveKeysLock(), PromiseStream(), diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 4301584306..8e70b62ea4 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -679,7 +679,7 @@ ACTOR Future dataDistribution(Reference self, std::vector teamCollectionsPtrs; primaryTeamCollection = makeReference( - cx, + self->txnProcessor, self->ddId, self->lock, self->relocationProducer, @@ -700,7 +700,7 @@ ACTOR Future dataDistribution(Reference self, self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; }); if (self->configuration.usableRegions > 1) { remoteTeamCollection = - makeReference(cx, + makeReference(self->txnProcessor, self->ddId, self->lock, self->relocationProducer, diff --git a/fdbserver/include/fdbserver/DDTeamCollection.h b/fdbserver/include/fdbserver/DDTeamCollection.h index c10ab86af5..9dc9faf326 100644 --- a/fdbserver/include/fdbserver/DDTeamCollection.h +++ b/fdbserver/include/fdbserver/DDTeamCollection.h @@ -603,6 +603,7 @@ class DDTeamCollection : public ReferenceCounted { int addTeamsBestOf(int teamsToBuild, int desiredTeams, int maxTeams); public: + std::shared_ptr dbProcessor; Database cx; DatabaseConfiguration configuration; @@ -620,7 +621,7 @@ public: AsyncTrigger printDetailedTeamsInfo; Reference storageServerSet; - DDTeamCollection(Database const& cx, + DDTeamCollection(const std::shared_ptr& dbProcessor, UID distributorId, MoveKeysLock const& lock, PromiseStream const& output, diff --git a/fdbserver/include/fdbserver/DDTxnProcessor.h b/fdbserver/include/fdbserver/DDTxnProcessor.h index 00410a54f8..75bee7b568 100644 --- a/fdbserver/include/fdbserver/DDTxnProcessor.h +++ b/fdbserver/include/fdbserver/DDTxnProcessor.h @@ -36,6 +36,8 @@ public: struct SourceServers { std::vector srcServers, completeSources; // the same as RelocateData.src, RelocateData.completeSources; }; + virtual Database getDb() const = 0; + virtual bool isMocked() const = 0; // get the source server list and complete source server list for range virtual Future getSourceServersForRange(const KeyRangeRef range) = 0; @@ -89,6 +91,9 @@ public: DDTxnProcessor() = default; explicit DDTxnProcessor(Database cx) : cx(cx) {} + Database getDb() const override { return cx; }; + bool isMocked() const override { return false; }; + Future getSourceServersForRange(const KeyRangeRef range) override; // Call NativeAPI implementation directly