add perpetualStorageWiggler

This commit is contained in:
Xiaoxi Wang 2021-05-20 23:31:08 +00:00
parent 3f3a81b3d9
commit 85cd2b9945
5 changed files with 139 additions and 44 deletions

View File

@ -51,9 +51,11 @@ class TCMachineTeamInfo;
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self);
ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self);
ACTOR Future<Void> waitForAllDataRemoved(Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams);
struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
UID id;
Version addedVersion; // Read version when this Server is added
DDTeamCollection* collection;
StorageServerInterface lastKnownInterface;
ProcessClass lastKnownClass;
@ -80,10 +82,11 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
DDTeamCollection* collection,
ProcessClass processClass,
bool inDesiredDC,
Reference<LocalitySet> storageServerSet)
Reference<LocalitySet> storageServerSet,
Version addedVersion = 0)
: id(ssi.id()), collection(collection), lastKnownInterface(ssi), lastKnownClass(processClass),
dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()),
inDesiredDC(inDesiredDC), storeType(KeyValueStoreType::END) {
inDesiredDC(inDesiredDC), storeType(KeyValueStoreType::END), addedVersion(addedVersion) {
localityEntry = ((LocalityMap<UID>*)storageServerSet.getPtr())->add(ssi.locality, &id);
}
@ -358,15 +361,17 @@ private:
};
struct ServerStatus {
bool isWiggling;
bool isFailed;
bool isUndesired;
bool isWrongConfiguration;
bool initialized; // AsyncMap erases default constructed objects
LocalityData locality;
ServerStatus() : isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {}
ServerStatus()
: isWiggling(false), isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {}
ServerStatus(bool isFailed, bool isUndesired, LocalityData const& locality)
: isFailed(isFailed), isUndesired(isUndesired), locality(locality), isWrongConfiguration(false),
initialized(true) {}
initialized(true), isWiggling(false) {}
bool isUnhealthy() const { return isFailed || isUndesired; }
const char* toString() const { return isFailed ? "Failed" : isUndesired ? "Undesired" : "Healthy"; }
@ -577,7 +582,7 @@ Future<Void> teamTracker(struct DDTeamCollection* const& self,
struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// clang-format off
enum { REQUESTING_WORKER = 0, GETTING_WORKER = 1, GETTING_STORAGE = 2 };
enum class Status { NONE = 0, EXCLUDED = 1, FAILED = 2 };
enum class Status { NONE = 0, WIGGLING = 1, EXCLUDED = 2, FAILED = 3};
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
// addActor is used to create the actorCollection when the dataDistributionTeamCollection is created
@ -659,6 +664,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
AsyncTrigger printDetailedTeamsInfo;
PromiseStream<GetMetricsRequest> getShardMetrics;
PromiseStream<Promise<int>> getUnhealthyRelocationCount;
Promise<UID> removeFailedServer;
void resetLocalitySet() {
@ -698,7 +704,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
bool primary,
Reference<AsyncVar<bool>> processingUnhealthy,
PromiseStream<GetMetricsRequest> getShardMetrics,
Promise<UID> removeFailedServer)
Promise<UID> removeFailedServer,
PromiseStream<Promise<int>> getUnhealthyRelocationCount)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
teamBuilder(Void()), badTeamRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()),
@ -713,7 +720,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary),
medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO), lastMedianAvailableSpaceUpdate(0),
processingUnhealthy(processingUnhealthy), lowestUtilizationTeam(0), highestUtilizationTeam(0),
getShardMetrics(getShardMetrics), removeFailedServer(removeFailedServer) {
getShardMetrics(getShardMetrics), removeFailedServer(removeFailedServer),
getUnhealthyRelocationCount(getUnhealthyRelocationCount) {
if (!primary || configuration.usableRegions == 1) {
TraceEvent("DDTrackerStarting", distributorId).detail("State", "Inactive").trackLatest("DDTrackerStarting");
}
@ -2741,6 +2749,42 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("MachineTeams", machineTeams.size())
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER);
}
std::vector<Future<Void>> excludeStorageWigglingServers(const Value& pid) {
std::vector<Future<Void>> moveFutures;
if (this->pid2server_info.count(pid) != 0) {
for (auto& info : this->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip);
if (this->excludedServers.count(addr) &&
this->excludedServers.get(addr) != DDTeamCollection::Status::NONE) {
continue; // don't overwrite the value set by actor trackExcludedServer
}
this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING);
moveFutures.push_back(
waitForAllDataRemoved(this->cx, info->lastKnownInterface.id(), info->addedVersion, this));
}
if (!moveFutures.empty()) {
this->restartRecruiting.trigger();
}
}
return moveFutures;
}
void includeStorageWigglingServers(const Value& pid) {
bool included = false;
for (auto& info : this->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip);
if (!this->excludedServers.count(addr) ||
this->excludedServers.get(addr) != DDTeamCollection::Status::WIGGLING) {
continue;
}
included = true;
this->excludedServers.set(addr, DDTeamCollection::Status::NONE);
}
if (included) {
this->restartRecruiting.trigger();
}
}
};
TCServerInfo::~TCServerInfo() {
@ -3381,6 +3425,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
state vector<Future<Void>> change;
bool anyUndesired = false;
bool anyWrongConfiguration = false;
bool anyWigglingServer = false;
int serversLeft = 0;
for (const UID& uid : team->getServerIDs()) {
@ -3395,6 +3440,9 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
if (status.isWrongConfiguration) {
anyWrongConfiguration = true;
}
if (status.isWiggling) {
anyWigglingServer = true;
}
}
if (serversLeft == 0) {
@ -3412,7 +3460,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
change.push_back(self->zeroHealthyTeams->onChange());
bool healthy = !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize;
bool healthy =
!badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize && !anyWigglingServer;
team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam
bool optimal = team->isOptimal() && healthy;
bool containsFailed = teamContainsFailedServer(self, team);
@ -3519,6 +3568,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
} else if (anyUndesired) {
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER);
} else if (anyWigglingServer) {
team->setPriority(SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE);
} else {
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY);
}
@ -3549,7 +3600,9 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
lastZeroHealthy =
self->zeroHealthyTeams->get(); // set this again in case it changed from this teams health changing
if ((self->initialFailureReactionDelay.isReady() && !self->zeroHealthyTeams->get()) || containsFailed) {
if ((self->initialFailureReactionDelay.isReady() && !self->zeroHealthyTeams->get()) || containsFailed ||
anyWigglingServer) {
vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor(
ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary));
@ -3817,46 +3870,68 @@ ACTOR Future<Void> perpetualStorageWiggler(FutureStream<Void> stopSignal,
const DDEnabledState* ddEnabledState) {
state Promise<Value> pidPromise;
state Future<Void> watchFuture;
state Future<Void> moveFinishFuture = Never();
state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY);
state AsyncTrigger restart;
state Future<Void> ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow);
state Value pid;
wait(store(watchFuture, watchPerpetualStoragePIDChange(self->cx, pidPromise)));
loop choose {
when(waitNext(stopSignal)) { break; }
when(wait(watchFuture)) { wait(store(watchFuture, watchPerpetualStoragePIDChange(self->cx, pidPromise))); }
when(state Value pid = wait(pidPromise.getFuture())) {
when(wait(store(pid, pidPromise.getFuture()))) {
pidPromise.reset();
if (self->pid2server_info.count(pid) != 0) {
std::vector<Future<Void>> moveFutures;
for (auto& info : self->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip);
if (self->excludedServers.count(addr) &&
self->excludedServers.get(addr) != DDTeamCollection::Status::NONE) {
continue; // don't override the value set by actor trackExcludedServer
}
self->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING);
moveFutures.push_back(
waitForAllDataRemoved(self->cx, info->lastKnownInterface.id(), info->addedVersion, self));
}
// wait for all data is moved from this process
if (!moveFutures.empty()) {
self->restartRecruiting.trigger();
wait(waitForAllReady(moveFutures));
}
// re-include wiggling storage servers
for (auto& info : self->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip);
if (!self->excludedServers.count(addr) ||
self->excludedServers.get(addr) != DDTeamCollection::Status::WIGGLING) {
continue;
}
self->excludedServers.set(addr, DDTeamCollection::Status::NONE);
}
if (self->healthyTeamCount <= 1) { // pre-check health status
pauseWiggle.trigger();
} else {
auto fv = self->excludeStorageWigglingServers(pid);
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", fv.size());
}
// finish Wiggle this process
}
when(wait(restart.onTrigger())) {
auto fv = self->excludeStorageWigglingServers(pid);
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleRestart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", fv.size());
}
when(wait(moveFinishFuture)) {
moveFinishFuture = Never();
self->includeStorageWigglingServers(pid);
finishStorageWiggleSignal.send(Void());
TraceEvent("PerpetualStorageWiggleFinish", self->distributorId).detail("ProcessId", pid);
pid = Value();
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (self->zeroHealthyTeams->get()) {
pauseWiggle.trigger();
}
}
when(wait(ddQueueCheck)) {
Promise<int> countp;
self->getUnhealthyRelocationCount.send(countp);
int count = wait(countp.getFuture());
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD) {
pauseWiggle.trigger();
} else if (count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && self->healthyTeamCount > 1) {
restart.trigger();
}
ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow);
}
when(wait(pauseWiggle.onTrigger())) {
moveFinishFuture = Never();
self->includeStorageWigglingServers(pid);
TraceEvent("PerpetualStorageWigglePause", self->distributorId).detail("ProcessId", pid);
}
}
self->includeStorageWigglingServers(pid);
return Void();
}
@ -3884,6 +3959,8 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
if (speed == 1) {
collection.add(perpetualStorageWiggleIterator(
stopWiggleSignal.getFuture(), finishStorageWiggleSignal.getFuture(), teamCollection));
collection.add(perpetualStorageWiggler(
stopWiggleSignal.getFuture(), finishStorageWiggleSignal, teamCollection, ddEnabledState));
finishStorageWiggleSignal.send(Void());
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId);
} else {
@ -4302,7 +4379,12 @@ ACTOR Future<Void> storageServerTracker(
otherChanges.push_back(self->excludedServers.onChange(testAddr));
}
if (worstStatus != DDTeamCollection::Status::NONE) {
if (worstStatus == DDTeamCollection::Status::WIGGLING) {
TraceEvent("WigglingStorageServer", self->distributorId)
.detail("Server", server->id)
.detail("Address", worstAddr.toString());
status.isWiggling = true;
} else if (worstStatus != DDTeamCollection::Status::NONE) {
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
.detail("Server", server->id)
.detail("Excluded", worstAddr.toString());
@ -5270,6 +5352,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
state PromiseStream<RelocateShard> output;
state PromiseStream<RelocateShard> input;
state PromiseStream<Promise<int64_t>> getAverageShardBytes;
state PromiseStream<Promise<int>> getUnhealthyRelocationCount;
state PromiseStream<GetMetricsRequest> getShardMetrics;
state Reference<AsyncVar<bool>> processingUnhealthy(new AsyncVar<bool>(false));
state Promise<Void> readyToStart;
@ -5353,6 +5436,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
shardsAffectedByTeamFailure,
lock,
getAverageShardBytes,
getUnhealthyRelocationCount,
self->ddId,
storageTeamSize,
configuration.storageTeamSize,
@ -5377,7 +5461,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
true,
processingUnhealthy,
getShardMetrics,
removeFailedServer);
removeFailedServer,
getUnhealthyRelocationCount);
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
if (configuration.usableRegions > 1) {
remoteTeamCollection =
@ -5394,7 +5479,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
false,
processingUnhealthy,
getShardMetrics,
removeFailedServer);
removeFailedServer,
getUnhealthyRelocationCount);
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back(
@ -5866,7 +5952,8 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
true,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>(),
Promise<UID>()));
Promise<UID>(),
PromiseStream<Promise<int>>()));
for (int id = 1; id <= processCount; ++id) {
UID uid(id, 0);
@ -5908,7 +5995,8 @@ std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
true,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>(),
Promise<UID>()));
Promise<UID>(),
PromiseStream<Promise<int>>()));
for (int id = 1; id <= processCount; id++) {
UID uid(id, 0);

View File

@ -263,6 +263,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<Promise<int64_t>> getAverageShardBytes,
PromiseStream<Promise<int>> getUnhealthyRelocationCount,
UID distributorId,
int teamSize,
int singleRegionTeamSize,

View File

@ -1550,6 +1550,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<Promise<int64_t>> getAverageShardBytes,
PromiseStream<Promise<int>> getUnhealthyRelocationCount,
UID distributorId,
int teamSize,
int singleRegionTeamSize,
@ -1679,6 +1680,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
when(wait(waitForAll(balancingFutures))) {}
when(Promise<int> r = waitNext(getUnhealthyRelocationCount.getFuture())) {
r.send(self.unhealthyRelocations);
}
}
}
} catch (Error& e) {

View File

@ -131,6 +131,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PRIORITY_RECOVER_MOVE, 110 );
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 140 );
init( PRIORITY_TEAM_HEALTHY, 140 );
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
init( PRIORITY_TEAM_REDUNDANT, 200 );

View File

@ -133,6 +133,7 @@ public:
int PRIORITY_RECOVER_MOVE;
int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM;
int PRIORITY_REBALANCE_OVERUTILIZED_TEAM;
int PRIORITY_PERPETUAL_STORAGE_WIGGLE;
int PRIORITY_TEAM_HEALTHY;
int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
int PRIORITY_TEAM_REDUNDANT;