fix: we need to build teams if a server becomes healthy and it is not already on any teams

This commit is contained in:
Evan Tschannen 2018-11-09 18:06:00 -08:00
parent 56c51c1bb3
commit 7c23b68501
1 changed files with 52 additions and 51 deletions

View File

@ -299,54 +299,6 @@ ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version add
}
}
ACTOR Future<Void> storageServerFailureTracker(
Database cx,
StorageServerInterface server,
ServerStatusMap *statusMap,
ServerStatus *status,
Debouncer* restartRecruiting,
int64_t *unhealthyServers,
UID masterId,
Version addedVersion )
{
loop {
if( statusMap->get(server.id()).initialized ) {
bool unhealthy = statusMap->get(server.id()).isUnhealthy();
if(unhealthy && !status->isUnhealthy()) {
(*unhealthyServers)--;
}
if(!unhealthy && status->isUnhealthy()) {
(*unhealthyServers)++;
}
} else if(status->isUnhealthy()) {
(*unhealthyServers)++;
}
statusMap->set( server.id(), *status );
if( status->isFailed )
restartRecruiting->trigger();
state double startTime = now();
choose {
when ( Void _ = wait( status->isFailed
? IFailureMonitor::failureMonitor().onStateEqual( server.waitFailure.getEndpoint(), FailureStatus(false) )
: waitFailureClient(server.waitFailure, SERVER_KNOBS->DATA_DISTRIBUTION_FAILURE_REACTION_TIME, 0, TaskDataDistribution) ) )
{
double elapsed = now() - startTime;
if(!status->isFailed && elapsed < SERVER_KNOBS->DATA_DISTRIBUTION_FAILURE_REACTION_TIME) {
Void _ = wait(delay(SERVER_KNOBS->DATA_DISTRIBUTION_FAILURE_REACTION_TIME - elapsed));
}
status->isFailed = !status->isFailed;
TraceEvent("StatusMapChange", masterId).detail("ServerID", server.id()).detail("Status", status->toString())
.detail("Available", IFailureMonitor::failureMonitor().getState(server.waitFailure.getEndpoint()).isAvailable());
}
when ( Void _ = wait( status->isUnhealthy() ? waitForAllDataRemoved(cx, server.id(), addedVersion) : Never() ) ) { break; }
}
}
return Void();
}
// Read keyservers, return unique set of teams
ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Database cx, UID masterId, MoveKeysLock moveKeysLock, std::vector<Optional<Key>> remoteDcIds ) {
state Reference<InitialDataDistribution> result = Reference<InitialDataDistribution>(new InitialDataDistribution);
@ -1762,6 +1714,57 @@ ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
return Void();
}
ACTOR Future<Void> storageServerFailureTracker(
DDTeamCollection* self,
TCServerInfo *server,
Database cx,
ServerStatusMap *statusMap,
ServerStatus *status,
Version addedVersion )
{
state StorageServerInterface interf = server->lastKnownInterface;
loop {
if( statusMap->get(interf.id()).initialized ) {
bool unhealthy = statusMap->get(interf.id()).isUnhealthy();
if(unhealthy && !status->isUnhealthy()) {
self->unhealthyServers--;
}
if(!unhealthy && status->isUnhealthy()) {
self->unhealthyServers++;
}
} else if(status->isUnhealthy()) {
self->unhealthyServers++;
}
statusMap->set( interf.id(), *status );
if( status->isFailed )
self->restartRecruiting.trigger();
state double startTime = now();
choose {
when ( Void _ = wait( status->isFailed
? IFailureMonitor::failureMonitor().onStateEqual( interf.waitFailure.getEndpoint(), FailureStatus(false) )
: waitFailureClient(interf.waitFailure, SERVER_KNOBS->DATA_DISTRIBUTION_FAILURE_REACTION_TIME, 0, TaskDataDistribution) ) )
{
double elapsed = now() - startTime;
if(!status->isFailed && elapsed < SERVER_KNOBS->DATA_DISTRIBUTION_FAILURE_REACTION_TIME) {
Void _ = wait(delay(SERVER_KNOBS->DATA_DISTRIBUTION_FAILURE_REACTION_TIME - elapsed));
}
status->isFailed = !status->isFailed;
if(!status->isFailed && !server->teams.size()) {
self->doBuildTeams = true;
}
TraceEvent("StatusMapChange", self->masterId).detail("ServerID", interf.id()).detail("Status", status->toString())
.detail("Available", IFailureMonitor::failureMonitor().getState(interf.waitFailure.getEndpoint()).isAvailable());
}
when ( Void _ = wait( status->isUnhealthy() ? waitForAllDataRemoved(cx, interf.id(), addedVersion) : Never() ) ) { break; }
}
}
return Void();
}
ACTOR Future<Void> storageServerTracker(
DDTeamCollection* self,
Database cx,
@ -1858,7 +1861,7 @@ ACTOR Future<Void> storageServerTracker(
otherChanges.push_back( self->excludedServers.onChange( addr ) );
otherChanges.push_back( self->excludedServers.onChange( ipaddr ) );
failureTracker = storageServerFailureTracker( cx, server->lastKnownInterface, statusMap, &status, &self->restartRecruiting, &self->unhealthyServers, masterId, addedVersion );
failureTracker = storageServerFailureTracker( self, server, cx, statusMap, &status, addedVersion );
//We need to recruit new storage servers if the key value store type has changed
if(hasWrongStoreTypeOrDC)
@ -2412,8 +2415,6 @@ ACTOR Future<Void> dataDistribution(
if(configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
}
TraceEvent("DDInitShard").detail("Keys", printable(keys)).detail("PrimarySrc", describe(initData->shards[shard].primarySrc)).detail("RemoteSrc", describe(initData->shards[shard].remoteSrc))
.detail("PrimaryDest", describe(initData->shards[shard].primaryDest)).detail("RemoteDest", describe(initData->shards[shard].remoteDest));
shardsAffectedByTeamFailure->moveShard(keys, teams);
if(initData->shards[shard].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in