From f5a81154efc3e3708088dc248162ba91284cea7a Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 8 Jul 2022 14:11:31 -0700 Subject: [PATCH 1/2] move takeMoveKeysLock to DDTxnProcessor --- fdbserver/DDTeamCollection.actor.cpp | 4 +- fdbserver/DDTxnProcessor.actor.cpp | 4 ++ fdbserver/DataDistribution.actor.cpp | 43 +++++++++++-------- fdbserver/DataDistributionQueue.actor.cpp | 2 +- fdbserver/include/fdbserver/DDTxnProcessor.h | 5 +++ .../fdbserver/DataDistribution.actor.h | 1 + 6 files changed, 38 insertions(+), 21 deletions(-) diff --git a/fdbserver/DDTeamCollection.actor.cpp b/fdbserver/DDTeamCollection.actor.cpp index 6159a878a1..b121056c35 100644 --- a/fdbserver/DDTeamCollection.actor.cpp +++ b/fdbserver/DDTeamCollection.actor.cpp @@ -3029,8 +3029,8 @@ public: .trackLatest(self->primary ? "TotalDataInFlight" : "TotalDataInFlightRemote"); // This trace event's trackLatest // lifetime is controlled by - // DataDistributorData::totalDataInFlightEventHolder or - // DataDistributorData::totalDataInFlightRemoteEventHolder. + // DataDistributor::totalDataInFlightEventHolder or + // DataDistributor::totalDataInFlightRemoteEventHolder. // The track latest key we use here must match the key used in // the holder. diff --git a/fdbserver/DDTxnProcessor.actor.cpp b/fdbserver/DDTxnProcessor.actor.cpp index 499da9d734..187b31582b 100644 --- a/fdbserver/DDTxnProcessor.actor.cpp +++ b/fdbserver/DDTxnProcessor.actor.cpp @@ -97,4 +97,8 @@ Future DDTxnProcessor::getSourceServersForRange( Future>> DDTxnProcessor::getServerListAndProcessClasses() { Transaction tr(cx); return NativeAPI::getServerListAndProcessClasses(&tr); +} + +Future DDTxnProcessor::takeMoveKeysLock(UID ddId) const { + return ::takeMoveKeysLock(cx, ddId); } \ No newline at end of file diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index c4e099d6ee..a5f9e342e6 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -36,7 +36,6 @@ #include "fdbserver/FDBExecHelper.actor.h" #include "fdbserver/IKeyValueStore.h" #include "fdbserver/Knobs.h" -#include "fdbserver/MoveKeys.actor.h" #include "fdbserver/QuietDatabase.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/TenantCache.h" @@ -557,30 +556,38 @@ ACTOR Future pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab } } -struct DataDistributorData : NonCopyable, ReferenceCounted { +struct DataDistributor : NonCopyable, ReferenceCounted { Reference const> dbInfo; UID ddId; PromiseStream> addActor; + + // State initialized when bootstrap DDTeamCollection* teamCollection; + std::shared_ptr txnProcessor; + MoveKeysLock lock; + Reference initialDDEventHolder; Reference movingDataEventHolder; Reference totalDataInFlightEventHolder; Reference totalDataInFlightRemoteEventHolder; - DataDistributorData(Reference const> const& db, UID id) - : dbInfo(db), ddId(id), teamCollection(nullptr), + DataDistributor(Reference const> const& db, UID id) + : dbInfo(db), ddId(id), teamCollection(nullptr), txnProcessor(nullptr), initialDDEventHolder(makeReference("InitialDD")), movingDataEventHolder(makeReference("MovingData")), totalDataInFlightEventHolder(makeReference("TotalDataInFlight")), totalDataInFlightRemoteEventHolder(makeReference("TotalDataInFlightRemote")) {} + + Future takeMoveKeysLock() { return store(lock, txnProcessor->takeMoveKeysLock(ddId)); } }; // Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection -ACTOR Future dataDistribution(Reference self, +ACTOR Future dataDistribution(Reference self, PromiseStream getShardMetricsList, const DDEnabledState* ddEnabledState) { state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DataDistributionLaunch, LockAware::True); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; + self->txnProcessor = std::shared_ptr(new DDTxnProcessor(cx)); // cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) // &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); ASSERT( cx->locationCacheSize == @@ -588,11 +595,11 @@ ACTOR Future dataDistribution(Reference self, // ); // wait(debugCheckCoalescing(cx)); + // FIXME: wrap the bootstrap process into class DataDistributor state std::vector> primaryDcId; state std::vector> remoteDcIds; state DatabaseConfiguration configuration; state Reference initData; - state MoveKeysLock lock; state Reference primaryTeamCollection; state Reference remoteTeamCollection; state bool trackerCancelled; @@ -607,8 +614,7 @@ ACTOR Future dataDistribution(Reference self, try { loop { TraceEvent("DDInitTakingMoveKeysLock", self->ddId).log(); - MoveKeysLock lock_ = wait(takeMoveKeysLock(cx, self->ddId)); - lock = lock_; + wait(self->takeMoveKeysLock()); TraceEvent("DDInitTookMoveKeysLock", self->ddId).log(); DatabaseConfiguration configuration_ = wait(getDatabaseConfiguration(cx)); @@ -657,7 +663,7 @@ ACTOR Future dataDistribution(Reference self, Reference initData_ = wait(getInitialDataDistribution( cx, self->ddId, - lock, + self->lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector>(), ddEnabledState)); initData = initData_; @@ -841,7 +847,7 @@ ACTOR Future dataDistribution(Reference self, ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors())); } - actors.push_back(pollMoveKeysLock(cx, lock, ddEnabledState)); + actors.push_back(pollMoveKeysLock(cx, self->lock, ddEnabledState)); actors.push_back(reportErrorsExcept(dataDistributionTracker(initData, cx, output, @@ -867,7 +873,7 @@ ACTOR Future dataDistribution(Reference self, processingWiggle, tcis, shardsAffectedByTeamFailure, - lock, + self->lock, getAverageShardBytes, getUnhealthyRelocationCount.getFuture(), self->ddId, @@ -882,7 +888,7 @@ ACTOR Future dataDistribution(Reference self, primaryTeamCollection = makeReference( cx, self->ddId, - lock, + self->lock, output, shardsAffectedByTeamFailure, configuration, @@ -903,7 +909,7 @@ ACTOR Future dataDistribution(Reference self, remoteTeamCollection = makeReference(cx, self->ddId, - lock, + self->lock, output, shardsAffectedByTeamFailure, configuration, @@ -973,9 +979,10 @@ ACTOR Future dataDistribution(Reference self, if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) { TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err); wait(removeKeysFromFailedServer( - cx, removeFailedServer.getFuture().get(), teamForDroppedRange, lock, ddEnabledState)); + cx, removeFailedServer.getFuture().get(), teamForDroppedRange, self->lock, ddEnabledState)); Optional tssPairID; - wait(removeStorageServer(cx, removeFailedServer.getFuture().get(), tssPairID, lock, ddEnabledState)); + wait(removeStorageServer( + cx, removeFailedServer.getFuture().get(), tssPairID, self->lock, ddEnabledState)); } else { if (err.code() != error_code_movekeys_conflict) { throw err; @@ -1390,7 +1397,7 @@ ACTOR Future ddSnapCreate( } ACTOR Future ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req, - Reference self, + Reference self, Database cx) { TraceEvent("DDExclusionSafetyCheckBegin", self->ddId).log(); std::vector ssis = wait(getStorageServers(cx)); @@ -1482,7 +1489,7 @@ static int64_t getMedianShardSize(VectorRef metricVec) { return metricVec[metricVec.size() / 2].shardBytes; } -GetStorageWigglerStateReply getStorageWigglerStates(Reference self) { +GetStorageWigglerStateReply getStorageWigglerStates(Reference self) { GetStorageWigglerStateReply reply; if (self->teamCollection) { std::tie(reply.primary, reply.lastStateChangePrimary) = self->teamCollection->getStorageWigglerState(); @@ -1520,7 +1527,7 @@ ACTOR Future ddGetMetrics(GetDataDistributorMetricsRequest req, } ACTOR Future dataDistributor(DataDistributorInterface di, Reference const> db) { - state Reference self(new DataDistributorData(db, di.id())); + state Reference self(new DataDistributor(db, di.id())); state Future collection = actorCollection(self->addActor.getFuture()); state PromiseStream getShardMetricsList; state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 299dff4d92..a574989302 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -2315,7 +2315,7 @@ ACTOR Future dataDistributionQueue(Database cx, .detail("PriorityTeam0Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_0_LEFT]) .detail("PrioritySplitShard", self.priority_relocations[SERVER_KNOBS->PRIORITY_SPLIT_SHARD]) .trackLatest("MovingData"); // This trace event's trackLatest lifetime is controlled by - // DataDistributorData::movingDataEventHolder. The track latest + // DataDistributor::movingDataEventHolder. The track latest // key we use here must match the key used in the holder. } when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator diff --git a/fdbserver/include/fdbserver/DDTxnProcessor.h b/fdbserver/include/fdbserver/DDTxnProcessor.h index 506e8b28cf..a2e258510e 100644 --- a/fdbserver/include/fdbserver/DDTxnProcessor.h +++ b/fdbserver/include/fdbserver/DDTxnProcessor.h @@ -23,6 +23,7 @@ #include "fdbserver/Knobs.h" #include "fdbserver/DataDistribution.actor.h" +#include "fdbserver/MoveKeys.actor.h" class IDDTxnProcessor { public: @@ -36,6 +37,8 @@ public: virtual Future>> getServerListAndProcessClasses() = 0; virtual ~IDDTxnProcessor() = default; + + virtual Future takeMoveKeysLock(UID ddId) const { return MoveKeysLock(); } }; class DDTxnProcessorImpl; @@ -53,6 +56,8 @@ public: // Call NativeAPI implementation directly Future>> getServerListAndProcessClasses() override; + + Future takeMoveKeysLock(UID ddId) const override; }; // run mock transaction diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index 117ede61d0..679864140e 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -26,6 +26,7 @@ #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/RunTransaction.actor.h" +#include "fdbserver/DDTxnProcessor.h" #include "fdbserver/Knobs.h" #include "fdbserver/LogSystem.h" #include "fdbserver/MoveKeys.actor.h" From 9cead35911bc6f98ff55b1cf2a00bf754f6d055b Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 8 Jul 2022 14:19:14 -0700 Subject: [PATCH 2/2] add contract comments --- fdbserver/include/fdbserver/DDTxnProcessor.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/fdbserver/include/fdbserver/DDTxnProcessor.h b/fdbserver/include/fdbserver/DDTxnProcessor.h index a2e258510e..883136af7f 100644 --- a/fdbserver/include/fdbserver/DDTxnProcessor.h +++ b/fdbserver/include/fdbserver/DDTxnProcessor.h @@ -25,6 +25,11 @@ #include "fdbserver/DataDistribution.actor.h" #include "fdbserver/MoveKeys.actor.h" +/* Testability Contract: + * a. The DataDistributor has to use this interface to interact with data-plane (aka. run transaction), because the + * testability benefits from a mock implementation; b. Other control-plane roles should consider providing its own + * TxnProcessor interface to provide testability, for example, Ratekeeper. + * */ class IDDTxnProcessor { public: struct SourceServers { @@ -38,7 +43,7 @@ public: virtual ~IDDTxnProcessor() = default; - virtual Future takeMoveKeysLock(UID ddId) const { return MoveKeysLock(); } + [[nodiscard]] virtual Future takeMoveKeysLock(UID ddId) const { return MoveKeysLock(); } }; class DDTxnProcessorImpl;