fix: if the tagPartitionedLogSystem cannot do a forced recovery, the master should not execute it forced recovery based modifications either

This commit is contained in:
Evan Tschannen 2019-02-18 15:13:18 -08:00
parent dcca22038c
commit 9cfadad41b
3 changed files with 18 additions and 8 deletions

View File

@ -649,7 +649,7 @@ struct ILogSystem {
static Reference<ILogSystem> fromOldLogSystemConfig( UID const& dbgid, struct LocalityData const&, struct LogSystemConfig const& );
// Constructs a new ILogSystem implementation from the old log data within a ServerDBInfo/LogSystemConfig. Might return a null reference if there isn't a fully recovered log system available.
static Future<Void> recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSystem>>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream<TLogRejoinRequest> const& rejoins, LocalityData const& locality, bool forceRecovery);
static Future<Void> recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSystem>>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream<TLogRejoinRequest> const& rejoins, LocalityData const& locality, bool* forceRecovery);
// Constructs a new ILogSystem implementation based on the given oldState and rejoining log servers
// Ensures that any calls to push or confirmEpochLive in the current epoch but strictly later than change_epoch will not return
// Whenever changes in the set of available log servers require restarting recovery with a different end sequence, outLogSystem will be changed to a new ILogSystem

View File

@ -117,7 +117,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return dbgid;
}
static Future<Void> recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSystem>>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream<TLogRejoinRequest> const& rejoins, LocalityData const& locality, bool forceRecovery) {
static Future<Void> recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSystem>>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream<TLogRejoinRequest> const& rejoins, LocalityData const& locality, bool* forceRecovery) {
return epochEnd( outLogSystem, dbgid, oldState, rejoins, locality, forceRecovery );
}
@ -1254,7 +1254,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return Void();
}
ACTOR static Future<Void> epochEnd( Reference<AsyncVar<Reference<ILogSystem>>> outLogSystem, UID dbgid, DBCoreState prevState, FutureStream<TLogRejoinRequest> rejoinRequests, LocalityData locality, bool forceRecovery ) {
ACTOR static Future<Void> epochEnd( Reference<AsyncVar<Reference<ILogSystem>>> outLogSystem, UID dbgid, DBCoreState prevState, FutureStream<TLogRejoinRequest> rejoinRequests, LocalityData locality, bool* forceRecovery ) {
// Stops a co-quorum of tlogs so that no further versions can be committed until the DBCoreState coordination state is changed
// Creates a new logSystem representing the (now frozen) epoch
// No other important side effects.
@ -1272,7 +1272,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
throw internal_error();
}
if(forceRecovery) {
if(*forceRecovery) {
DBCoreState modifiedState = prevState;
int8_t primaryLocality = -1;
@ -1345,10 +1345,20 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
prevState = modifiedState;
} else {
forceRecovery = false;
*forceRecovery = false;
}
} else {
*forceRecovery = false;
}
TraceEvent(SevWarnAlways, "ForcedRecovery", dbgid).detail("PrimaryLocality", primaryLocality).detail("RemoteLocality", remoteLocality).detail("FoundRemote", foundRemote).detail("Modified", modifiedLogSets).detail("Removed", removedLogSets);
for(int i = 0; i < prevState.tLogs.size(); i++) {
TraceEvent("ForcedRecoveryTLogs", dbgid).detail("I", i).detail("Log", ::describe(prevState.tLogs[i].tLogs)).detail("Loc", prevState.tLogs[i].locality);
}
for(int i = 0; i < prevState.oldTLogData.size(); i++) {
for(int j = 0; j < prevState.oldTLogData[i].tLogs.size(); j++) {
TraceEvent("ForcedRecoveryTLogs", dbgid).detail("I", i).detail("J",j).detail("Log", ::describe(prevState.oldTLogData[i].tLogs[j].tLogs)).detail("Loc", prevState.oldTLogData[i].tLogs[j].locality);
}
}
}
TEST( true ); // Master recovery from pre-existing database
@ -1455,7 +1465,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if(forceRecovery) {
if(*forceRecovery) {
state std::vector<LogLockInfo> allLockResults;
ASSERT( lockResults.size() == 1 );
allLockResults.push_back(lockResults[0]);
@ -2290,7 +2300,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
};
};
Future<Void> ILogSystem::recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSystem>>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream<TLogRejoinRequest> const& rejoins, LocalityData const& locality, bool forceRecovery) {
Future<Void> ILogSystem::recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSystem>>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream<TLogRejoinRequest> const& rejoins, LocalityData const& locality, bool* forceRecovery) {
return TagPartitionedLogSystem::recoverAndEndEpoch( outLogSystem, dbgid, oldState, rejoins, locality, forceRecovery );
}

View File

@ -1234,7 +1234,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
.trackLatest("MasterRecoveryState");
state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> );
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality, self->forceRecovery);
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality, &self->forceRecovery);
DBCoreState newState = self->cstate.myDBState;
newState.recoveryCount++;