Merge pull request #7757 from sfc-gh-xwang/refactor/main/enableState
[DD Testability] Encapsulate shared states in DD into DDSharedContext
This commit is contained in:
commit
75a52afb31
|
@ -29,6 +29,7 @@
|
|||
#include "fdbrpc/sim_validation.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
#include "fdbserver/DDSharedContext.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -516,7 +517,7 @@ ACTOR Future<Void> dataDistributionRelocator(struct DDQueue* self,
|
|||
Future<Void> prevCleanup,
|
||||
const DDEnabledState* ddEnabledState);
|
||||
|
||||
struct DDQueue {
|
||||
struct DDQueue : public IDDRelocationQueue {
|
||||
struct DDDataMove {
|
||||
DDDataMove() = default;
|
||||
explicit DDDataMove(UID id) : id(id) {}
|
||||
|
@ -734,8 +735,9 @@ struct DDQueue {
|
|||
FutureStream<RelocateShard> input,
|
||||
PromiseStream<GetMetricsRequest> getShardMetrics,
|
||||
PromiseStream<GetTopKMetricsRequest> getTopKMetrics)
|
||||
: distributorId(mid), lock(lock), cx(cx), txnProcessor(new DDTxnProcessor(cx)), teamCollections(teamCollections),
|
||||
shardsAffectedByTeamFailure(sABTF), getAverageShardBytes(getAverageShardBytes),
|
||||
: IDDRelocationQueue(), distributorId(mid), lock(lock), cx(cx), txnProcessor(new DDTxnProcessor(cx)),
|
||||
teamCollections(teamCollections), shardsAffectedByTeamFailure(sABTF),
|
||||
getAverageShardBytes(getAverageShardBytes),
|
||||
startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
||||
finishMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
||||
cleanUpDataMoveParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
||||
|
@ -1312,6 +1314,8 @@ struct DDQueue {
|
|||
};
|
||||
return recurring(f, SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL);
|
||||
}
|
||||
|
||||
int getUnhealthyRelocationCount() override { return unhealthyRelocations; }
|
||||
};
|
||||
|
||||
ACTOR Future<Void> cancelDataMove(struct DDQueue* self, KeyRange range, const DDEnabledState* ddEnabledState) {
|
||||
|
@ -2489,7 +2493,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
|
|||
}
|
||||
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
|
||||
when(wait(waitForAll(ddQueueFutures))) {}
|
||||
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); }
|
||||
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) {
|
||||
r.send(self.getUnhealthyRelocationCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -2502,6 +2508,8 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> dataDistributionQueue(Reference<DDSharedContext> context, Database cx);
|
||||
|
||||
TEST_CASE("/DataDistribution/DDQueue/ServerCounterTrace") {
|
||||
state double duration = 2.5 * SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL;
|
||||
state DDQueue self;
|
|
@ -21,6 +21,7 @@
|
|||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
#include "fdbserver/DDSharedContext.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
|
@ -68,7 +69,7 @@ ACTOR Future<Void> updateMaxShardSize(Reference<AsyncVar<int64_t>> dbSizeEstimat
|
|||
}
|
||||
}
|
||||
|
||||
struct DataDistributionTracker {
|
||||
struct DataDistributionTracker : public IDDShardTracker {
|
||||
Database cx;
|
||||
UID distributorId;
|
||||
KeyRangeMap<ShardTrackedData>* shards;
|
||||
|
@ -125,16 +126,18 @@ struct DataDistributionTracker {
|
|||
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
|
||||
KeyRangeMap<ShardTrackedData>* shards,
|
||||
bool* trackerCancelled)
|
||||
: cx(cx), distributorId(distributorId), shards(shards), sizeChanges(false), systemSizeEstimate(0),
|
||||
dbSizeEstimate(new AsyncVar<int64_t>()), maxShardSize(new AsyncVar<Optional<int64_t>>()), output(output),
|
||||
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), readyToStart(readyToStart),
|
||||
: IDDShardTracker(), cx(cx), distributorId(distributorId), shards(shards), sizeChanges(false),
|
||||
systemSizeEstimate(0), dbSizeEstimate(new AsyncVar<int64_t>()), maxShardSize(new AsyncVar<Optional<int64_t>>()),
|
||||
output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), readyToStart(readyToStart),
|
||||
anyZeroHealthyTeams(anyZeroHealthyTeams), trackerCancelled(trackerCancelled) {}
|
||||
|
||||
~DataDistributionTracker() {
|
||||
~DataDistributionTracker() override {
|
||||
*trackerCancelled = true;
|
||||
// Cancel all actors so they aren't waiting on sizeChanged broken promise
|
||||
sizeChanges.clear(false);
|
||||
}
|
||||
|
||||
double getAverageShardBytes() override { return maxShardSize->get().get() / 2.0; }
|
||||
};
|
||||
|
||||
void restartShardTrackers(DataDistributionTracker* self,
|
||||
|
@ -1025,9 +1028,7 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
initData = Reference<InitialDataDistribution>();
|
||||
|
||||
loop choose {
|
||||
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) {
|
||||
req.send(self.maxShardSize->get().get() / 2);
|
||||
}
|
||||
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) { req.send(self.getAverageShardBytes()); }
|
||||
when(wait(loggingTrigger)) {
|
||||
TraceEvent("DDTrackerStats", self.distributorId)
|
||||
.detail("Shards", self.shards->size())
|
||||
|
@ -1056,3 +1057,9 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// Not used yet
|
||||
ACTOR Future<Void> dataDistributionTracker(Reference<DDSharedContext> context,
|
||||
Reference<InitialDataDistribution> initData,
|
||||
Database cx,
|
||||
KeyRangeMap<ShardTrackedData>* shards);
|
|
@ -49,7 +49,7 @@
|
|||
#include "flow/serialize.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/UnitTest.h"
|
||||
|
||||
#include "fdbserver/DDSharedContext.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int priority) {
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* DDRelocationQueue.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef FOUNDATIONDB_DDRELOCATIONQUEUE_H
|
||||
#define FOUNDATIONDB_DDRELOCATIONQUEUE_H
|
||||
|
||||
#include "DataDistribution.actor.h"
|
||||
// send request/signal to DDRelocationQueue through interface
|
||||
// call synchronous method from components outside DDRelocationQueue
|
||||
struct IDDRelocationQueue {
|
||||
PromiseStream<RelocateShard> relocationProducer, relocationConsumer; // FIXME(xwang): not used yet
|
||||
// PromiseStream<Promise<int>> getUnhealthyRelocationCount; // FIXME(xwang): change it to a synchronous call
|
||||
|
||||
virtual int getUnhealthyRelocationCount() = 0;
|
||||
virtual ~IDDRelocationQueue() = default;
|
||||
;
|
||||
};
|
||||
|
||||
#endif // FOUNDATIONDB_DDRELOCATIONQUEUE_H
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* DDShardTracker.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef FOUNDATIONDB_DDSHARDTRACKER_H
|
||||
#define FOUNDATIONDB_DDSHARDTRACKER_H
|
||||
#include "DataDistribution.actor.h"
|
||||
|
||||
// send request/signal to DDTracker through interface
|
||||
// call synchronous method from components outside DDShardTracker
|
||||
struct IDDShardTracker {
|
||||
// FIXME: the streams are not used yet
|
||||
Promise<Void> readyToStart;
|
||||
PromiseStream<GetMetricsRequest> getShardMetrics;
|
||||
PromiseStream<GetTopKMetricsRequest> getTopKMetrics;
|
||||
PromiseStream<GetMetricsListRequest> getShardMetricsList;
|
||||
PromiseStream<KeyRange> restartShardTracker;
|
||||
|
||||
// PromiseStream<Promise<int64_t>> averageShardBytes; // FIXME(xwang): change it to a synchronous call
|
||||
|
||||
virtual double getAverageShardBytes() = 0;
|
||||
virtual ~IDDShardTracker() = default;
|
||||
};
|
||||
|
||||
#endif // FOUNDATIONDB_DDSHARDTRACKER_H
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* DDSharedContext.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef FOUNDATIONDB_DDSHAREDCONTEXT_H
|
||||
#define FOUNDATIONDB_DDSHAREDCONTEXT_H
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/ShardsAffectedByTeamFailure.h"
|
||||
#include "fdbserver/DDShardTracker.h"
|
||||
#include "fdbserver/DDRelocationQueue.h"
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
|
||||
// The common info shared by all DD components. Normally the DD components should share the reference to the same
|
||||
// context.
|
||||
// NOTE: We should avoid the shared class become an insanely large class, think twice before add member to it.
|
||||
class DDSharedContext : public ReferenceCounted<DDSharedContext> {
|
||||
// FIXME(xwang) mark fields privates
|
||||
public:
|
||||
std::unique_ptr<DDEnabledState>
|
||||
ddEnabledState; // Note: don't operate directly because it's shared with snapshot server
|
||||
IDDShardTracker* shardTracker = nullptr;
|
||||
IDDRelocationQueue* relocationQueue = nullptr;
|
||||
std::vector<IDDTeamCollection*> teamCollections;
|
||||
|
||||
// public:
|
||||
UID ddId;
|
||||
MoveKeysLock lock;
|
||||
bool trackerCancelled = false;
|
||||
DatabaseConfiguration configuration;
|
||||
|
||||
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
|
||||
Reference<AsyncVar<bool>> processingUnhealthy, processingWiggle;
|
||||
|
||||
DDSharedContext() = default;
|
||||
|
||||
DDSharedContext(UID id)
|
||||
: ddEnabledState(new DDEnabledState), ddId(id), shardsAffectedByTeamFailure(new ShardsAffectedByTeamFailure),
|
||||
processingUnhealthy(new AsyncVar<bool>(false)), processingWiggle(new AsyncVar<bool>(false)) {}
|
||||
|
||||
UID id() const { return ddId; }
|
||||
|
||||
void markTrackerCancelled() { trackerCancelled = true; }
|
||||
|
||||
bool isTrackerCancelled() const { return trackerCancelled; }
|
||||
|
||||
decltype(auto) usableRegions() const { return configuration.usableRegions; }
|
||||
|
||||
bool isDDEnabled() const { return ddEnabledState->isDDEnabled(); };
|
||||
|
||||
void proposeRelocation(const RelocateShard& rs) const { return relocationQueue->relocationProducer.send(rs); }
|
||||
|
||||
void requestRestartShardTracker(KeyRange keys) const { return shardTracker->restartShardTracker.send(keys); }
|
||||
};
|
||||
|
||||
#endif // FOUNDATIONDB_DDSHAREDCONTEXT_H
|
|
@ -173,6 +173,13 @@ FDB_DECLARE_BOOLEAN_PARAM(IsRedundantTeam);
|
|||
FDB_DECLARE_BOOLEAN_PARAM(IsBadTeam);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(WaitWiggle);
|
||||
|
||||
// send request/signal to DDTeamCollection through interface
|
||||
// call synchronous method from components outside DDTeamCollection
|
||||
struct IDDTeamCollection {
|
||||
PromiseStream<GetTeamRequest> getTeam;
|
||||
virtual ~IDDTeamCollection() {}
|
||||
};
|
||||
|
||||
class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
|
||||
friend class DDTeamCollectionImpl;
|
||||
friend class DDTeamCollectionUnitTest;
|
||||
|
@ -402,7 +409,7 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
|
|||
// in the next iteration of the loop. Otherwise, you may miss checking some elements in machineTeams
|
||||
bool removeMachineTeam(Reference<TCMachineTeamInfo> targetMT);
|
||||
|
||||
// Adds storage servers held on process of which the Process Id is “id” into excludeServers which prevent
|
||||
// Adds storage servers held on process of which the Process id is “id” into excludeServers which prevent
|
||||
// recruiting the wiggling storage servers and let teamTracker start to move data off the affected teams;
|
||||
// Return a vector of futures wait for all data is moved to other teams.
|
||||
Future<Void> excludeStorageServersForWiggle(const UID& id);
|
||||
|
|
|
@ -36,6 +36,11 @@
|
|||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
/////////////////////////////// Data //////////////////////////////////////
|
||||
#ifndef __INTEL_COMPILER
|
||||
#pragma region Data
|
||||
#endif
|
||||
|
||||
// SOMEDAY: whether it's possible to combine RelocateReason and DataMovementReason together?
|
||||
// RelocateReason to DataMovementReason is one-to-N mapping
|
||||
class RelocateReason {
|
||||
|
@ -291,6 +296,7 @@ struct GetTopKMetricsReply {
|
|||
GetTopKMetricsReply(std::vector<KeyRangeStorageMetrics> const& m, double minReadLoad, double maxReadLoad)
|
||||
: shardMetrics(m), minReadLoad(minReadLoad), maxReadLoad(maxReadLoad) {}
|
||||
};
|
||||
|
||||
struct GetTopKMetricsRequest {
|
||||
int topK = 1; // default only return the top 1 shard based on the GetTopKMetricsRequest::compare function
|
||||
std::vector<KeyRange> keys;
|
||||
|
@ -329,10 +335,6 @@ struct GetMetricsListRequest {
|
|||
GetMetricsListRequest(KeyRange const& keys, const int shardLimit) : keys(keys), shardLimit(shardLimit) {}
|
||||
};
|
||||
|
||||
struct TeamCollectionInterface {
|
||||
PromiseStream<GetTeamRequest> getTeam;
|
||||
};
|
||||
|
||||
// DDShardInfo is so named to avoid link-time name collision with ShardInfo within the StorageServer
|
||||
struct DDShardInfo {
|
||||
Key key;
|
||||
|
@ -382,6 +384,37 @@ struct ShardTrackedData {
|
|||
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
|
||||
};
|
||||
|
||||
// Holds the permitted size and IO Bounds for a shard
|
||||
struct ShardSizeBounds {
|
||||
StorageMetrics max;
|
||||
StorageMetrics min;
|
||||
StorageMetrics permittedError;
|
||||
|
||||
bool operator==(ShardSizeBounds const& rhs) const {
|
||||
return max == rhs.max && min == rhs.min && permittedError == rhs.permittedError;
|
||||
}
|
||||
};
|
||||
|
||||
// Gets the permitted size and IO bounds for a shard
|
||||
ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);
|
||||
|
||||
// Determines the maximum shard size based on the size of the database
|
||||
int64_t getMaxShardSize(double dbSizeEstimate);
|
||||
|
||||
#ifndef __INTEL_COMPILER
|
||||
#pragma endregion
|
||||
#endif
|
||||
|
||||
// FIXME(xwang): Delete Old DD Actors once the refactoring is done
|
||||
/////////////////////////////// Old DD Actors //////////////////////////////////////
|
||||
#ifndef __INTEL_COMPILER
|
||||
#pragma region Old DD Actors
|
||||
#endif
|
||||
|
||||
struct TeamCollectionInterface {
|
||||
PromiseStream<GetTeamRequest> getTeam;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData,
|
||||
Database cx,
|
||||
PromiseStream<RelocateShard> output,
|
||||
|
@ -412,24 +445,14 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
|
|||
int teamSize,
|
||||
int singleRegionTeamSize,
|
||||
const DDEnabledState* ddEnabledState);
|
||||
#ifndef __INTEL_COMPILER
|
||||
#pragma endregion
|
||||
#endif
|
||||
|
||||
// Holds the permitted size and IO Bounds for a shard
|
||||
struct ShardSizeBounds {
|
||||
StorageMetrics max;
|
||||
StorageMetrics min;
|
||||
StorageMetrics permittedError;
|
||||
|
||||
bool operator==(ShardSizeBounds const& rhs) const {
|
||||
return max == rhs.max && min == rhs.min && permittedError == rhs.permittedError;
|
||||
}
|
||||
};
|
||||
|
||||
// Gets the permitted size and IO bounds for a shard
|
||||
ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);
|
||||
|
||||
// Determines the maximum shard size based on the size of the database
|
||||
int64_t getMaxShardSize(double dbSizeEstimate);
|
||||
|
||||
/////////////////////////////// Perpetual Storage Wiggle //////////////////////////////////////
|
||||
#ifndef __INTEL_COMPILER
|
||||
#pragma region Perpetual Storage Wiggle
|
||||
#endif
|
||||
class DDTeamCollection;
|
||||
|
||||
struct StorageWiggleMetrics {
|
||||
|
@ -591,5 +614,9 @@ struct StorageWiggler : ReferenceCounted<StorageWiggler> {
|
|||
}
|
||||
};
|
||||
|
||||
#ifndef __INTEL_COMPILER
|
||||
#pragma endregion
|
||||
#endif
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue