change DDTeamCollection constructor
This commit is contained in:
parent
f471f3da04
commit
97fd5878d9
|
@ -3581,7 +3581,7 @@ bool DDTeamCollection::satisfiesPolicy(const std::vector<Reference<TCServerInfo>
|
|||
return result && resultEntries.size() == 0;
|
||||
}
|
||||
|
||||
DDTeamCollection::DDTeamCollection(Database const& cx,
|
||||
DDTeamCollection::DDTeamCollection(const std::shared_ptr<IDDTxnProcessor>& dbProcessor,
|
||||
UID distributorId,
|
||||
MoveKeysLock const& lock,
|
||||
PromiseStream<RelocateShard> const& output,
|
||||
|
@ -3597,9 +3597,9 @@ DDTeamCollection::DDTeamCollection(Database const& cx,
|
|||
PromiseStream<GetMetricsRequest> getShardMetrics,
|
||||
Promise<UID> removeFailedServer,
|
||||
PromiseStream<Promise<int>> getUnhealthyRelocationCount)
|
||||
: doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()), lock(lock), output(output),
|
||||
unhealthyServers(0), storageWiggler(makeReference<StorageWiggler>(this)), processingWiggle(processingWiggle),
|
||||
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure),
|
||||
: dbProcessor(dbProcessor), doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()),
|
||||
lock(lock), output(output), unhealthyServers(0), storageWiggler(makeReference<StorageWiggler>(this)),
|
||||
processingWiggle(processingWiggle), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure),
|
||||
initialFailureReactionDelay(
|
||||
delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)),
|
||||
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay)), recruitingStream(0),
|
||||
|
@ -3615,8 +3615,13 @@ DDTeamCollection::DDTeamCollection(Database const& cx,
|
|||
teamCollectionInfoEventHolder(makeReference<EventCacheHolder>("TeamCollectionInfo")),
|
||||
storageServerRecruitmentEventHolder(
|
||||
makeReference<EventCacheHolder>("StorageServerRecruitment_" + distributorId.toString())),
|
||||
primary(primary), distributorId(distributorId), cx(cx), configuration(configuration),
|
||||
primary(primary), distributorId(distributorId), configuration(configuration),
|
||||
storageServerSet(new LocalityMap<UID>()) {
|
||||
|
||||
if (!dbProcessor->isMocked()) {
|
||||
cx = this->dbProcessor->getDb();
|
||||
}
|
||||
|
||||
if (!primary || configuration.usableRegions == 1) {
|
||||
TraceEvent("DDTrackerStarting", distributorId)
|
||||
.detail("State", "Inactive")
|
||||
|
@ -5147,13 +5152,13 @@ public:
|
|||
int processCount) {
|
||||
Database database = DatabaseContext::create(
|
||||
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::False);
|
||||
|
||||
auto txnProcessor = std::shared_ptr<IDDTxnProcessor>(new DDTxnProcessor(database));
|
||||
DatabaseConfiguration conf;
|
||||
conf.storageTeamSize = teamSize;
|
||||
conf.storagePolicy = policy;
|
||||
|
||||
auto collection =
|
||||
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(database,
|
||||
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(txnProcessor,
|
||||
UID(0, 0),
|
||||
MoveKeysLock(),
|
||||
PromiseStream<RelocateShard>(),
|
||||
|
@ -5191,13 +5196,13 @@ public:
|
|||
int processCount) {
|
||||
Database database = DatabaseContext::create(
|
||||
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::False);
|
||||
|
||||
auto txnProcessor = std::shared_ptr<IDDTxnProcessor>(new DDTxnProcessor(database));
|
||||
DatabaseConfiguration conf;
|
||||
conf.storageTeamSize = teamSize;
|
||||
conf.storagePolicy = policy;
|
||||
|
||||
auto collection =
|
||||
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(database,
|
||||
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(txnProcessor,
|
||||
UID(0, 0),
|
||||
MoveKeysLock(),
|
||||
PromiseStream<RelocateShard>(),
|
||||
|
|
|
@ -679,7 +679,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
|
||||
std::vector<DDTeamCollection*> teamCollectionsPtrs;
|
||||
primaryTeamCollection = makeReference<DDTeamCollection>(
|
||||
cx,
|
||||
self->txnProcessor,
|
||||
self->ddId,
|
||||
self->lock,
|
||||
self->relocationProducer,
|
||||
|
@ -700,7 +700,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; });
|
||||
if (self->configuration.usableRegions > 1) {
|
||||
remoteTeamCollection =
|
||||
makeReference<DDTeamCollection>(cx,
|
||||
makeReference<DDTeamCollection>(self->txnProcessor,
|
||||
self->ddId,
|
||||
self->lock,
|
||||
self->relocationProducer,
|
||||
|
|
|
@ -603,6 +603,7 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
|
|||
int addTeamsBestOf(int teamsToBuild, int desiredTeams, int maxTeams);
|
||||
|
||||
public:
|
||||
std::shared_ptr<IDDTxnProcessor> dbProcessor;
|
||||
Database cx;
|
||||
|
||||
DatabaseConfiguration configuration;
|
||||
|
@ -620,7 +621,7 @@ public:
|
|||
AsyncTrigger printDetailedTeamsInfo;
|
||||
Reference<LocalitySet> storageServerSet;
|
||||
|
||||
DDTeamCollection(Database const& cx,
|
||||
DDTeamCollection(const std::shared_ptr<IDDTxnProcessor>& dbProcessor,
|
||||
UID distributorId,
|
||||
MoveKeysLock const& lock,
|
||||
PromiseStream<RelocateShard> const& output,
|
||||
|
|
|
@ -36,6 +36,8 @@ public:
|
|||
struct SourceServers {
|
||||
std::vector<UID> srcServers, completeSources; // the same as RelocateData.src, RelocateData.completeSources;
|
||||
};
|
||||
virtual Database getDb() const = 0;
|
||||
virtual bool isMocked() const = 0;
|
||||
// get the source server list and complete source server list for range
|
||||
virtual Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) = 0;
|
||||
|
||||
|
@ -89,6 +91,9 @@ public:
|
|||
DDTxnProcessor() = default;
|
||||
explicit DDTxnProcessor(Database cx) : cx(cx) {}
|
||||
|
||||
Database getDb() const override { return cx; };
|
||||
bool isMocked() const override { return false; };
|
||||
|
||||
Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) override;
|
||||
|
||||
// Call NativeAPI implementation directly
|
||||
|
|
Loading…
Reference in New Issue