Merge pull request #5018 from sfc-gh-xwang/ppwiggle

Solve cluster status oscillation when ss is not enough for perpetual wiggle
This commit is contained in:
Xiaoxi Wang 2021-06-23 10:03:58 -07:00 committed by GitHub
commit 4e97cbc2d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 37 deletions

View File

@ -210,6 +210,7 @@ void ServerKnobs::initialize(Randomize _randomize, ClientKnobs* clientKnobs, IsS
init( ALL_DATA_REMOVED_DELAY, 1.0 );
init( INITIAL_FAILURE_REACTION_DELAY, 30.0 ); if( randomize && BUGGIFY ) INITIAL_FAILURE_REACTION_DELAY = 0.0;
init( CHECK_TEAM_DELAY, 30.0 );
init( PERPETUAL_WIGGLE_DELAY, 50.0 );
init( LOG_ON_COMPLETION_DELAY, DD_QUEUE_LOGGING_INTERVAL );
init( BEST_TEAM_MAX_TEAM_TRIES, 10 );
init( BEST_TEAM_OPTION_COUNT, 4 );

View File

@ -160,6 +160,7 @@ public:
double ALL_DATA_REMOVED_DELAY;
double INITIAL_FAILURE_REACTION_DELAY;
double CHECK_TEAM_DELAY;
double PERPETUAL_WIGGLE_DELAY;
double LOG_ON_COMPLETION_DELAY;
int BEST_TEAM_MAX_TEAM_TRIES;
int BEST_TEAM_OPTION_COUNT;

View File

@ -2840,8 +2840,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
this->wiggle_addresses.push_back(addr);
this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING);
moveFutures.push_back(
waitForAllDataRemoved(this->cx, info->lastKnownInterface.id(), info->addedVersion, this));
moveFutures.push_back(info->onRemoved);
}
if (!moveFutures.empty()) {
this->restartRecruiting.trigger();
@ -3898,29 +3897,27 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
// to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0.
ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection) {
state ReadYourWritesTransaction tr(teamCollection->cx);
state Value writeValue = LiteralStringRef("0");
state Value writeValue;
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Value> value = wait(tr.get(wigglingStorageServerKey));
if (teamCollection->pid2server_info.empty()) {
tr.set(wigglingStorageServerKey, LiteralStringRef("0"));
writeValue = LiteralStringRef("");
} else {
Value pid = teamCollection->pid2server_info.begin()->first;
if (value.present()) {
auto nextIt = teamCollection->pid2server_info.upper_bound(value.get());
if (nextIt == teamCollection->pid2server_info.end()) {
tr.set(wigglingStorageServerKey, pid);
writeValue = pid;
} else {
tr.set(wigglingStorageServerKey, nextIt->first);
writeValue = nextIt->first;
}
} else {
tr.set(wigglingStorageServerKey, pid);
writeValue = pid;
}
}
tr.set(wigglingStorageServerKey, writeValue);
wait(tr.commit());
break;
} catch (Error& e) {
@ -3939,10 +3936,20 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal,
FutureStream<Void> finishStorageWiggleSignal,
DDTeamCollection* teamCollection) {
state int lastFinishTime = now();
loop {
choose {
when(wait(stopSignal->onChange())) {}
when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); }
when(waitNext(finishStorageWiggleSignal)) {
state bool takeRest = true; // delay to avoid delete and update ServerList too frequently
while (takeRest) {
wait(delayJittered(SERVER_KNOBS->PERPETUAL_WIGGLE_DELAY));
// there must not have other teams to place wiggled data
takeRest = teamCollection->server_info.size() <= teamCollection->configuration.storageTeamSize ||
teamCollection->machine_info.size() < teamCollection->configuration.storageTeamSize;
}
wait(updateNextWigglingStoragePID(teamCollection));
}
}
if (stopSignal->get()) {
break;
@ -3976,17 +3983,24 @@ ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(DDTe
}
// periodically check whether the cluster is healthy if we continue perpetual wiggle
ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self) {
ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self, int* extraTeamCount) {
state int pausePenalty = 1;
loop {
Promise<int> countp;
self->getUnhealthyRelocationCount.send(countp);
int count = wait(countp.getFuture());
// pause wiggle when
// a. DDQueue is busy with unhealthy relocation request
// b. no healthy team
// b. healthy teams are not enough
// c. the overall disk space is not enough
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount == 0 ||
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD || self->healthyTeamCount <= *extraTeamCount ||
self->bestTeamStuck) {
// if we pause wiggle not because the reason a, increase extraTeamCount. This helps avoid oscillation
// between pause and non-pause status.
if ((self->healthyTeamCount <= *extraTeamCount || self->bestTeamStuck) && !self->pauseWiggle->get()) {
*extraTeamCount = std::min(*extraTeamCount + pausePenalty, (int)self->teams.size());
pausePenalty = std::min(pausePenalty * 2, (int)self->teams.size());
}
self->pauseWiggle->set(true);
} else {
self->pauseWiggle->set(false);
@ -4004,9 +4018,9 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
const DDEnabledState* ddEnabledState) {
state Future<Void> watchFuture = Never();
state Future<Void> moveFinishFuture = Never();
state Future<Void> ddQueueCheck = clusterHealthCheckForPerpetualWiggle(self);
state int extraTeamCount = 0;
state Future<Void> ddQueueCheck = clusterHealthCheckForPerpetualWiggle(self, &extraTeamCount);
state int movingCount = 0;
state vector<UID> excludedServerIds;
state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self));
ASSERT(!self->wigglingPid.present()); // only single process wiggle is allowed
self->wigglingPid = Optional<Key>(res.second);
@ -4020,26 +4034,19 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
self->includeStorageServersForWiggle();
TraceEvent("PerpetualStorageWigglePause", self->distributorId)
.detail("ProcessId", pid)
.detail("ExtraHealthyTeamCount", extraTeamCount)
.detail("HealthyTeamCount", self->healthyTeamCount)
.detail("StorageCount", movingCount);
} else {
// pre-check whether wiggling chosen servers still satisfy replica requirement
excludedServerIds.clear();
for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) {
excludedServerIds.push_back(info->id);
}
if (_exclusionSafetyCheck(excludedServerIds, self)) {
TEST(true); // start wiggling
auto fv = self->excludeStorageServersForWiggle(pid);
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
} else {
TEST(true); // skip wiggling current process
TraceEvent("PerpetualStorageWiggleSkip", self->distributorId).detail("ProcessId", pid.toString());
moveFinishFuture = Void();
}
TEST(true); // start wiggling
auto fv = self->excludeStorageServersForWiggle(pid);
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
.detail("ProcessId", pid)
.detail("ExtraHealthyTeamCount", extraTeamCount)
.detail("HealthyTeamCount", self->healthyTeamCount)
.detail("StorageCount", movingCount);
}
}
@ -4055,9 +4062,9 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
wait(delayJittered(5.0, TaskPriority::DataDistributionLow));
}
when(wait(moveFinishFuture)) {
TEST(true); // finish wiggling this process
ASSERT(self->wigglingPid.present());
StringRef pid = self->wigglingPid.get();
TEST(pid != LiteralStringRef("")); // finish wiggling this process
moveFinishFuture = Never();
self->includeStorageServersForWiggle();
@ -4068,6 +4075,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
self->wigglingPid.reset();
watchFuture = res.first;
finishStorageWiggleSignal.send(Void());
extraTeamCount = std::max(0, extraTeamCount - 1);
}
when(wait(ddQueueCheck || self->pauseWiggle->onChange() || stopSignal->onChange())) {}
}
@ -4093,7 +4101,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollection,
const DDEnabledState* ddEnabledState) {
state int speed = 0;
state AsyncVar<bool> stopWiggleSignal(false);
state AsyncVar<bool> stopWiggleSignal(true);
state PromiseStream<Void> finishStorageWiggleSignal;
state SignalableActorCollection collection;
teamCollection->pauseWiggle = makeReference<AsyncVar<bool>>(true);
@ -4119,11 +4127,13 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
collection.add(perpetualStorageWiggler(
&stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState));
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId);
} else if (speed == 0 && !stopWiggleSignal.get()) {
stopWiggleSignal.set(true);
wait(collection.signalAndReset());
} else if (speed == 0) {
if (!stopWiggleSignal.get()) {
stopWiggleSignal.set(true);
wait(collection.signalAndReset());
teamCollection->pauseWiggle->set(true);
}
TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId);
teamCollection->pauseWiggle->set(true);
}
wait(watchFuture);
break;

View File

@ -1662,6 +1662,8 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM])
.detail("PriorityRebalanceOverutilizedTeam",
self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM])
.detail("PriorityStorageWiggle",
self.priority_relocations[SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE])
.detail("PriorityTeamHealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_HEALTHY])
.detail("PriorityTeamContainsUndesiredServer",
self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER])