Fix some corner case bugs exposed by simulation.

In one case, when a SS joins the cluster and DD doesn't find any healthy server to form a team with the newly added server, then the SS does not get added to any team even when the other servers get healthy.
Another is an extreme case where a data center is down, and a SS in the active DC joins and then dies immediately but not before DD adds it to a destination team for a relocating shard which will result in DD waiting indefinitely for the dead data center to come back up for the cluster to be fully recovered.
This commit is contained in:
Balachandar Namasivayam 2020-02-19 14:13:27 -08:00
parent ad1dd4fd9b
commit b1c3893d40
5 changed files with 49 additions and 30 deletions

View File

@ -400,7 +400,7 @@ struct GetStorageMetricsReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, load, available, capacity, bytesInputRate);
serializer(ar, load, available, capacity, bytesInputRate, version);
}
};

View File

@ -156,11 +156,16 @@ ACTOR Future<Void> updateServerMetrics( TCServerInfo *server, Database cx ) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
return tr->getReadVersion(); }));
Version versionDiff = versionNow - server->serverMetrics.get().version;
if(versionDiff > 15000000) { // TODO: Knobbify
TraceEvent(SevInfo, "SSVersionDiff").detail("ServerId", server->id.toString()).detail("VersionNow", versionNow).detail("SSVersion", server->serverMetrics.get().version).detail("Diff", versionDiff);
server->ssVersionTooFarBehind.set(true);
} else if(versionDiff < 10000000) {
server->ssVersionTooFarBehind.set(false);
if(versionDiff > SERVER_KNOBS->DD_SS_FAILURE_VERSIONLAG) { // TODO: Knobbify
if(server->ssVersionTooFarBehind.get() == false) {
TraceEvent(SevInfo, "SSVersionDiffLarge").detail("ServerId", server->id.toString()).detail("VersionNow", versionNow).detail("SSVersion", server->serverMetrics.get().version).detail("Diff", versionDiff);
server->ssVersionTooFarBehind.set(true);
}
} else if(versionDiff < SERVER_KNOBS->DD_SS_ALLOWED_VERSIONLAG) {
if(server->ssVersionTooFarBehind.get() == true) {
TraceEvent(SevInfo, "SSVersionDiffNormal").detail("ServerId", server->id.toString()).detail("VersionNow", versionNow).detail("SSVersion", server->serverMetrics.get().version).detail("Diff", versionDiff);
server->ssVersionTooFarBehind.set(false);
}
}
}
return Void();
@ -392,16 +397,15 @@ private:
struct ServerStatus {
bool isFailed;
bool isUndesired;
bool isTooFarBehind;
bool isWrongConfiguration;
bool initialized; //AsyncMap erases default constructed objects
LocalityData locality;
ServerStatus() : isFailed(true), isUndesired(false), isTooFarBehind(false), isWrongConfiguration(false), initialized(false) {}
ServerStatus( bool isFailed, bool isUndesired, bool isTooFarBehind, LocalityData const& locality ) : isFailed(isFailed), isUndesired(isUndesired), isTooFarBehind(isTooFarBehind), locality(locality), isWrongConfiguration(false), initialized(true) {}
bool isUnhealthy() const { return isFailed || isUndesired || isTooFarBehind; }
const char* toString() const { return isFailed ? "Failed" : isUndesired ? "Undesired" : isTooFarBehind ? "TooFarBehind" : "Healthy"; }
ServerStatus() : isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {}
ServerStatus( bool isFailed, bool isUndesired, LocalityData const& locality ) : isFailed(isFailed), isUndesired(isUndesired), locality(locality), isWrongConfiguration(false), initialized(true) {}
bool isUnhealthy() const { return isFailed || isUndesired; }
const char* toString() const { return isFailed ? "Failed" : isUndesired ? "Undesired" : "Healthy"; }
bool operator == (ServerStatus const& r) const { return isFailed == r.isFailed && isUndesired == r.isUndesired && isTooFarBehind == r.isTooFarBehind && isWrongConfiguration == r.isWrongConfiguration && locality == r.locality && initialized == r.initialized; }
bool operator == (ServerStatus const& r) const { return isFailed == r.isFailed && isUndesired == r.isUndesired && isWrongConfiguration == r.isWrongConfiguration && locality == r.locality && initialized == r.initialized; }
//If a process has reappeared without the storage server that was on it (isFailed == true), we don't need to exclude it
//We also don't need to exclude processes who are in the wrong configuration (since those servers will be removed)
@ -2270,6 +2274,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("DoBuildTeams", self->doBuildTeams)
.trackLatest("TeamCollectionInfo");
}
} else {
self->lastBuildTeamsFailed = true;
}
self->evaluateTeamQuality();
@ -3491,9 +3497,10 @@ ACTOR Future<Void> storageServerTracker(
TCServerInfo* server, // This actor is owned by this TCServerInfo, point to server_info[id]
Promise<Void> errorOut, Version addedVersion) {
state Future<Void> failureTracker;
state ServerStatus status( false, false, false, server->lastKnownInterface.locality );
state ServerStatus status( false, false, server->lastKnownInterface.locality );
state bool lastIsUnhealthy = false;
state Future<Void> metricsTracker = serverMetricsPolling( server, self->cx );
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged;
state Future<Void> storeTypeTracker = keyValueStoreTypeTracker(self, server);
@ -3504,9 +3511,8 @@ ACTOR Future<Void> storageServerTracker(
try {
loop {
status.isUndesired = false;
status.isUndesired = server->ssVersionTooFarBehind.get();
status.isWrongConfiguration = false;
status.isTooFarBehind = server->ssVersionTooFarBehind.get();
hasWrongDC = !isCorrectDC(self, server);
hasInvalidLocality =
!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality);
@ -3615,6 +3621,7 @@ ACTOR Future<Void> storageServerTracker(
self->restartRecruiting.trigger();
}
if (lastIsUnhealthy && !status.isUnhealthy() &&
( server->teams.size() < targetTeamNumPerServer || self->lastBuildTeamsFailed)) {
self->doBuildTeams = true;
@ -3622,11 +3629,10 @@ ACTOR Future<Void> storageServerTracker(
}
lastIsUnhealthy = status.isUnhealthy();
if(status.isTooFarBehind != server->ssVersionTooFarBehind.get()) {
TraceEvent("SSVersionHEREE", self->distributorId); // TODO: Remove
if(server->ssVersionTooFarBehind.get() && !status.isUndesired) {
continue;
}
state bool recordTeamCollectionInfo = false;
choose {
when(wait(failureTracker)) {
@ -3636,7 +3642,7 @@ ACTOR Future<Void> storageServerTracker(
if(server->updated.canBeSet()) {
server->updated.send(Void());
}
// Remove server from FF/serverList
wait( removeStorageServer( cx, server->id, self->lock ) );
@ -3745,7 +3751,7 @@ ACTOR Future<Void> storageServerTracker(
interfaceChanged = server->onInterfaceChanged;
// Old failureTracker for the old interface will be actorCancelled since the handler of the old
// actor now points to the new failure monitor actor.
status = ServerStatus( status.isFailed, status.isUndesired, status.isTooFarBehind, server->lastKnownInterface.locality );
status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality );
// self->traceTeamCollectionInfo();
recordTeamCollectionInfo = true;
@ -3774,9 +3780,7 @@ ACTOR Future<Void> storageServerTracker(
server->wakeUpTracker = Promise<Void>();
}
when(wait(storeTypeTracker)) {}
when( wait(server->ssVersionTooFarBehind.onChange()) ) { // TODO: Remove
TraceEvent("SSVersionTooFarBehind", self->distributorId).detail("ServerId", server->id.toString()).detail("VersionDifference", server->ssVersionTooFarBehind.get());
}
when( wait(server->ssVersionTooFarBehind.onChange()) ) { }
}
if (recordTeamCollectionInfo) {
@ -4869,7 +4873,7 @@ DDTeamCollection* testTeamCollection(int teamSize, Reference<IReplicationPolicy>
interface.locality.set(LiteralStringRef("zoneid"), Standalone<StringRef>(std::to_string(id % 5)));
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(id % 3)));
collection->server_info[uid] = Reference<TCServerInfo>(new TCServerInfo(interface, ProcessClass(), true, collection->storageServerSet));
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality));
collection->server_status.set(uid, ServerStatus(false, false, interface.locality));
collection->checkAndCreateMachine(collection->server_info[uid]);
}
@ -4911,7 +4915,7 @@ DDTeamCollection* testMachineTeamCollection(int teamSize, Reference<IReplication
collection->server_info[uid] =
Reference<TCServerInfo>(new TCServerInfo(interface, ProcessClass(), true, collection->storageServerSet));
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality));
collection->server_status.set(uid, ServerStatus(false, false, interface.locality));
}
int totalServerIndex = collection->constructMachinesFromServers();

View File

@ -215,6 +215,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( DD_VALIDATE_LOCALITY, true ); if( randomize && BUGGIFY ) DD_VALIDATE_LOCALITY = false;
init( DD_CHECK_INVALID_LOCALITY_DELAY, 60 ); if( randomize && BUGGIFY ) DD_CHECK_INVALID_LOCALITY_DELAY = 1 + deterministicRandom()->random01() * 600;
init( DD_ENABLE_VERBOSE_TRACING, false ); if( randomize && BUGGIFY ) DD_ENABLE_VERBOSE_TRACING = true;
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
init( DD_SS_ALLOWED_VERSIONLAG, 200000000 ); if( randomize && BUGGIFY ) { DD_SS_FAILURE_VERSIONLAG = deterministicRandom()->randomInt(15000000, 500000000); DD_SS_ALLOWED_VERSIONLAG = 0.75 * DD_SS_FAILURE_VERSIONLAG; }
// TeamRemover
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -173,7 +173,9 @@ public:
bool DD_VALIDATE_LOCALITY;
int DD_CHECK_INVALID_LOCALITY_DELAY;
bool DD_ENABLE_VERBOSE_TRACING;
int64_t DD_SS_FAILURE_VERSIONLAG;
int64_t DD_SS_ALLOWED_VERSIONLAG;
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor
double TR_REMOVE_MACHINE_TEAM_DELAY; // wait for the specified time before try to remove next machine team

View File

@ -66,6 +66,13 @@ struct KillRegionWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> waitForStorageRecovered( KillRegionWorkload *self ) {
while( self->dbInfo->get().recoveryState < RecoveryState::STORAGE_RECOVERED ) {
wait( self->dbInfo->onChange() );
}
return Void();
}
ACTOR static Future<Void> killRegion( KillRegionWorkload *self, Database cx ) {
ASSERT( g_network->isSimulated() );
if(deterministicRandom()->random01() < 0.5) {
@ -94,10 +101,13 @@ struct KillRegionWorkload : TestWorkload {
TraceEvent("ForceRecovery_GotConfig").detail("Conf", conf.toString());
if(conf.usableRegions>1) {
//only needed if force recovery was unnecessary and we killed the secondary
wait( success( changeConfig( cx, g_simulator.disablePrimary + " repopulate_anti_quorum=1", true ) ) );
while( self->dbInfo->get().recoveryState < RecoveryState::STORAGE_RECOVERED ) {
wait( self->dbInfo->onChange() );
loop {
//only needed if force recovery was unnecessary and we killed the secondary
wait( success( changeConfig( cx, g_simulator.disablePrimary + " repopulate_anti_quorum=1", true ) ) );
choose {
when( wait( waitForStorageRecovered(self) ) ) { break; }
when( wait( delay(300.0) ) ) { }
}
}
wait( success( changeConfig( cx, "usable_regions=1", true ) ) );
}