add useful trace; add invalid wiggling server check

This commit is contained in:
Xiaoxi Wang 2021-06-10 04:27:45 +00:00
parent 4220a548ce
commit cd58c0c149
2 changed files with 46 additions and 9 deletions

View File

@ -3536,8 +3536,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
change.push_back(self->zeroHealthyTeams->onChange());
bool healthy =
!badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize && !anyWigglingServer;
bool healthy = !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize;
team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam
bool optimal = team->isOptimal() && healthy;
bool containsFailed = teamContainsFailedServer(self, team);
@ -3891,6 +3890,7 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
// to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0.
ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection) {
state ReadYourWritesTransaction tr(teamCollection->cx);
state Value writeValue = LiteralStringRef("0");
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -3903,11 +3903,14 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
auto nextIt = teamCollection->pid2server_info.upper_bound(value.get());
if (nextIt == teamCollection->pid2server_info.end()) {
tr.set(wigglingStorageServerKey, pid);
writeValue = pid;
} else {
tr.set(wigglingStorageServerKey, nextIt->first);
writeValue = nextIt->first;
}
} else {
tr.set(wigglingStorageServerKey, pid);
writeValue = pid;
}
}
wait(tr.commit());
@ -3916,6 +3919,9 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
wait(tr.onError(e));
}
}
TraceEvent(SevDebug, "PerpetualNextWigglingStoragePID", teamCollection->distributorId)
.detail("WriteValue", writeValue);
return Void();
}
@ -3925,9 +3931,6 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
FutureStream<Void> finishStorageWiggleSignal,
DDTeamCollection* teamCollection) {
// initialize PID
wait(updateNextWigglingStoragePID(teamCollection));
loop choose {
when(wait(stopSignal->onTrigger())) { break; }
when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); }
@ -4068,9 +4071,8 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && !isPaused) {
pauseWiggle.trigger();
}
else if (isPaused && count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD &&
self->healthyTeamCount > 1 && _exclusionSafetyCheck(excludedServerIds, self)) {
} else if (isPaused && count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD &&
self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) {
restart.trigger();
}
ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow);
@ -4450,6 +4452,7 @@ ACTOR Future<Void> storageServerTracker(
loop {
status.isUndesired = !self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get();
status.isWrongConfiguration = false;
status.isWiggling = false;
hasWrongDC = !isCorrectDC(self, server);
hasInvalidLocality =
!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality);
@ -4529,10 +4532,21 @@ ACTOR Future<Void> storageServerTracker(
status.isWrongConfiguration = true;
}
// An invalid wiggle server should set itself the right status. Otherwise, it cannot be re-included by
// wiggler.
auto invalidWiggleServer =
[](const AddressExclusion& addr, const DDTeamCollection* tc, const TCServerInfo* server) {
return server->lastKnownInterface.locality.processId() != tc->wigglingPid;
};
// If the storage server is in the excluded servers list, it is undesired
NetworkAddress a = server->lastKnownInterface.address();
AddressExclusion worstAddr(a.ip, a.port);
DDTeamCollection::Status worstStatus = self->excludedServers.get(worstAddr);
if (worstStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(worstAddr, self, server)) {
self->excludedServers.set(worstAddr, DDTeamCollection::Status::NONE);
worstStatus = DDTeamCollection::Status::NONE;
}
otherChanges.push_back(self->excludedServers.onChange(worstAddr));
for (int i = 0; i < 3; i++) {
@ -4548,6 +4562,12 @@ ACTOR Future<Void> storageServerTracker(
else if (i == 2)
testAddr = AddressExclusion(server->lastKnownInterface.secondaryAddress().get().ip);
DDTeamCollection::Status testStatus = self->excludedServers.get(testAddr);
if (testStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(testAddr, self, server)) {
self->excludedServers.set(testAddr, DDTeamCollection::Status::NONE);
testStatus = DDTeamCollection::Status::NONE;
}
if (testStatus > worstStatus) {
worstStatus = testStatus;
worstAddr = testAddr;
@ -4631,11 +4651,14 @@ ACTOR Future<Void> storageServerTracker(
bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality;
bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() !=
newInterface.first.locality.zoneId().get();
bool processIdChanged = server->lastKnownInterface.locality.processId().get() !=
newInterface.first.locality.processId().get();
TraceEvent("StorageServerInterfaceChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged)
.detail("ProcessIdChanged", processIdChanged)
.detail("MachineLocalityChanged", machineLocalityChanged);
server->lastKnownInterface = newInterface.first;
@ -4680,6 +4703,20 @@ ACTOR Future<Void> storageServerTracker(
ASSERT(destMachine.isValid());
}
// update pid2server_info if the process id has changed
if (processIdChanged) {
self->pid2server_info[newInterface.first.locality.processId().get()].push_back(
self->server_info[server->id]);
// delete the old one
auto& old_infos =
self->pid2server_info[server->lastKnownInterface.locality.processId().get()];
for (int i = 0; i < old_infos.size(); ++i) {
if (old_infos[i].getPtr() == server) {
std::swap(old_infos[i--], old_infos.back());
old_infos.pop_back();
}
}
}
// Ensure the server's server team belong to a machine team, and
// Get the newBadTeams due to the locality change
vector<Reference<TCTeamInfo>> newBadTeams;

View File

@ -131,7 +131,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PRIORITY_RECOVER_MOVE, 110 );
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 140 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 139 );
init( PRIORITY_TEAM_HEALTHY, 140 );
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
init( PRIORITY_TEAM_REDUNDANT, 200 );