Merge pull request #7555 from sfc-gh-xwang/feature/dd-refactor-incremental
[DD Testability] move takeMoveKeysLock to DDTxnProcessor
This commit is contained in:
commit
6be20d08ae
|
@ -3029,8 +3029,8 @@ public:
|
|||
.trackLatest(self->primary ? "TotalDataInFlight"
|
||||
: "TotalDataInFlightRemote"); // This trace event's trackLatest
|
||||
// lifetime is controlled by
|
||||
// DataDistributorData::totalDataInFlightEventHolder or
|
||||
// DataDistributorData::totalDataInFlightRemoteEventHolder.
|
||||
// DataDistributor::totalDataInFlightEventHolder or
|
||||
// DataDistributor::totalDataInFlightRemoteEventHolder.
|
||||
// The track latest key we use here must match the key used in
|
||||
// the holder.
|
||||
|
||||
|
|
|
@ -97,4 +97,8 @@ Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(
|
|||
Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> DDTxnProcessor::getServerListAndProcessClasses() {
|
||||
Transaction tr(cx);
|
||||
return NativeAPI::getServerListAndProcessClasses(&tr);
|
||||
}
|
||||
|
||||
Future<MoveKeysLock> DDTxnProcessor::takeMoveKeysLock(UID ddId) const {
|
||||
return ::takeMoveKeysLock(cx, ddId);
|
||||
}
|
|
@ -36,7 +36,6 @@
|
|||
#include "fdbserver/FDBExecHelper.actor.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/TenantCache.h"
|
||||
|
@ -557,30 +556,38 @@ ACTOR Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab
|
|||
}
|
||||
}
|
||||
|
||||
struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> {
|
||||
struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
|
||||
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
|
||||
UID ddId;
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
|
||||
// State initialized when bootstrap
|
||||
DDTeamCollection* teamCollection;
|
||||
std::shared_ptr<IDDTxnProcessor> txnProcessor;
|
||||
MoveKeysLock lock;
|
||||
|
||||
Reference<EventCacheHolder> initialDDEventHolder;
|
||||
Reference<EventCacheHolder> movingDataEventHolder;
|
||||
Reference<EventCacheHolder> totalDataInFlightEventHolder;
|
||||
Reference<EventCacheHolder> totalDataInFlightRemoteEventHolder;
|
||||
|
||||
DataDistributorData(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
|
||||
: dbInfo(db), ddId(id), teamCollection(nullptr),
|
||||
DataDistributor(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
|
||||
: dbInfo(db), ddId(id), teamCollection(nullptr), txnProcessor(nullptr),
|
||||
initialDDEventHolder(makeReference<EventCacheHolder>("InitialDD")),
|
||||
movingDataEventHolder(makeReference<EventCacheHolder>("MovingData")),
|
||||
totalDataInFlightEventHolder(makeReference<EventCacheHolder>("TotalDataInFlight")),
|
||||
totalDataInFlightRemoteEventHolder(makeReference<EventCacheHolder>("TotalDataInFlightRemote")) {}
|
||||
|
||||
Future<Void> takeMoveKeysLock() { return store(lock, txnProcessor->takeMoveKeysLock(ddId)); }
|
||||
};
|
||||
|
||||
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
|
||||
ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
||||
ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
||||
PromiseStream<GetMetricsListRequest> getShardMetricsList,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DataDistributionLaunch, LockAware::True);
|
||||
cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE;
|
||||
self->txnProcessor = std::shared_ptr<IDDTxnProcessor>(new DDTxnProcessor(cx));
|
||||
|
||||
// cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*)
|
||||
// &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); ASSERT( cx->locationCacheSize ==
|
||||
|
@ -588,11 +595,11 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
// );
|
||||
|
||||
// wait(debugCheckCoalescing(cx));
|
||||
// FIXME: wrap the bootstrap process into class DataDistributor
|
||||
state std::vector<Optional<Key>> primaryDcId;
|
||||
state std::vector<Optional<Key>> remoteDcIds;
|
||||
state DatabaseConfiguration configuration;
|
||||
state Reference<InitialDataDistribution> initData;
|
||||
state MoveKeysLock lock;
|
||||
state Reference<DDTeamCollection> primaryTeamCollection;
|
||||
state Reference<DDTeamCollection> remoteTeamCollection;
|
||||
state bool trackerCancelled;
|
||||
|
@ -607,8 +614,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
try {
|
||||
loop {
|
||||
TraceEvent("DDInitTakingMoveKeysLock", self->ddId).log();
|
||||
MoveKeysLock lock_ = wait(takeMoveKeysLock(cx, self->ddId));
|
||||
lock = lock_;
|
||||
wait(self->takeMoveKeysLock());
|
||||
TraceEvent("DDInitTookMoveKeysLock", self->ddId).log();
|
||||
|
||||
DatabaseConfiguration configuration_ = wait(getDatabaseConfiguration(cx));
|
||||
|
@ -657,7 +663,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
Reference<InitialDataDistribution> initData_ = wait(getInitialDataDistribution(
|
||||
cx,
|
||||
self->ddId,
|
||||
lock,
|
||||
self->lock,
|
||||
configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(),
|
||||
ddEnabledState));
|
||||
initData = initData_;
|
||||
|
@ -841,7 +847,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors()));
|
||||
}
|
||||
|
||||
actors.push_back(pollMoveKeysLock(cx, lock, ddEnabledState));
|
||||
actors.push_back(pollMoveKeysLock(cx, self->lock, ddEnabledState));
|
||||
actors.push_back(reportErrorsExcept(dataDistributionTracker(initData,
|
||||
cx,
|
||||
output,
|
||||
|
@ -867,7 +873,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
processingWiggle,
|
||||
tcis,
|
||||
shardsAffectedByTeamFailure,
|
||||
lock,
|
||||
self->lock,
|
||||
getAverageShardBytes,
|
||||
getUnhealthyRelocationCount.getFuture(),
|
||||
self->ddId,
|
||||
|
@ -882,7 +888,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
primaryTeamCollection = makeReference<DDTeamCollection>(
|
||||
cx,
|
||||
self->ddId,
|
||||
lock,
|
||||
self->lock,
|
||||
output,
|
||||
shardsAffectedByTeamFailure,
|
||||
configuration,
|
||||
|
@ -903,7 +909,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
remoteTeamCollection =
|
||||
makeReference<DDTeamCollection>(cx,
|
||||
self->ddId,
|
||||
lock,
|
||||
self->lock,
|
||||
output,
|
||||
shardsAffectedByTeamFailure,
|
||||
configuration,
|
||||
|
@ -973,9 +979,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
|
||||
TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err);
|
||||
wait(removeKeysFromFailedServer(
|
||||
cx, removeFailedServer.getFuture().get(), teamForDroppedRange, lock, ddEnabledState));
|
||||
cx, removeFailedServer.getFuture().get(), teamForDroppedRange, self->lock, ddEnabledState));
|
||||
Optional<UID> tssPairID;
|
||||
wait(removeStorageServer(cx, removeFailedServer.getFuture().get(), tssPairID, lock, ddEnabledState));
|
||||
wait(removeStorageServer(
|
||||
cx, removeFailedServer.getFuture().get(), tssPairID, self->lock, ddEnabledState));
|
||||
} else {
|
||||
if (err.code() != error_code_movekeys_conflict) {
|
||||
throw err;
|
||||
|
@ -1390,7 +1397,7 @@ ACTOR Future<Void> ddSnapCreate(
|
|||
}
|
||||
|
||||
ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req,
|
||||
Reference<DataDistributorData> self,
|
||||
Reference<DataDistributor> self,
|
||||
Database cx) {
|
||||
TraceEvent("DDExclusionSafetyCheckBegin", self->ddId).log();
|
||||
std::vector<StorageServerInterface> ssis = wait(getStorageServers(cx));
|
||||
|
@ -1482,7 +1489,7 @@ static int64_t getMedianShardSize(VectorRef<DDMetricsRef> metricVec) {
|
|||
return metricVec[metricVec.size() / 2].shardBytes;
|
||||
}
|
||||
|
||||
GetStorageWigglerStateReply getStorageWigglerStates(Reference<DataDistributorData> self) {
|
||||
GetStorageWigglerStateReply getStorageWigglerStates(Reference<DataDistributor> self) {
|
||||
GetStorageWigglerStateReply reply;
|
||||
if (self->teamCollection) {
|
||||
std::tie(reply.primary, reply.lastStateChangePrimary) = self->teamCollection->getStorageWigglerState();
|
||||
|
@ -1520,7 +1527,7 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
|
|||
}
|
||||
|
||||
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<ServerDBInfo> const> db) {
|
||||
state Reference<DataDistributorData> self(new DataDistributorData(db, di.id()));
|
||||
state Reference<DataDistributor> self(new DataDistributor(db, di.id()));
|
||||
state Future<Void> collection = actorCollection(self->addActor.getFuture());
|
||||
state PromiseStream<GetMetricsListRequest> getShardMetricsList;
|
||||
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
|
||||
|
|
|
@ -2315,7 +2315,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
|
|||
.detail("PriorityTeam0Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_0_LEFT])
|
||||
.detail("PrioritySplitShard", self.priority_relocations[SERVER_KNOBS->PRIORITY_SPLIT_SHARD])
|
||||
.trackLatest("MovingData"); // This trace event's trackLatest lifetime is controlled by
|
||||
// DataDistributorData::movingDataEventHolder. The track latest
|
||||
// DataDistributor::movingDataEventHolder. The track latest
|
||||
// key we use here must match the key used in the holder.
|
||||
}
|
||||
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
|
||||
|
|
|
@ -23,7 +23,13 @@
|
|||
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
|
||||
/* Testability Contract:
|
||||
* a. The DataDistributor has to use this interface to interact with data-plane (aka. run transaction), because the
|
||||
* testability benefits from a mock implementation; b. Other control-plane roles should consider providing its own
|
||||
* TxnProcessor interface to provide testability, for example, Ratekeeper.
|
||||
* */
|
||||
class IDDTxnProcessor {
|
||||
public:
|
||||
struct SourceServers {
|
||||
|
@ -36,6 +42,8 @@ public:
|
|||
virtual Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses() = 0;
|
||||
|
||||
virtual ~IDDTxnProcessor() = default;
|
||||
|
||||
[[nodiscard]] virtual Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const { return MoveKeysLock(); }
|
||||
};
|
||||
|
||||
class DDTxnProcessorImpl;
|
||||
|
@ -53,6 +61,8 @@ public:
|
|||
|
||||
// Call NativeAPI implementation directly
|
||||
Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses() override;
|
||||
|
||||
Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const override;
|
||||
};
|
||||
|
||||
// run mock transaction
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/RunTransaction.actor.h"
|
||||
#include "fdbserver/DDTxnProcessor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
|
|
Loading…
Reference in New Issue