fix: the tlog did not cancel recovery properly when stopped

This commit is contained in:
Evan Tschannen 2018-01-12 17:18:14 -08:00
parent 2e6ce03224
commit be643d6937
1 changed files with 31 additions and 13 deletions

View File

@ -391,6 +391,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
TLogInterface tli;
PromiseStream<Future<Void>> addActor;
TLogData* tLogData;
Promise<bool> recoverySuccessful;
Future<Void> recovery;
explicit LogData(TLogData* tLogData, TLogInterface interf) : tLogData(tLogData), knownCommittedVersion(0), tli(interf), logId(interf.id()),
@ -451,7 +452,9 @@ ACTOR Future<Void> tLogLock( TLogData* self, ReplyPromise< TLogLockResult > repl
TraceEvent("TLogStop", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get());
logData->stopped = true;
logData->recovery = Void();
if(logData->recoverySuccessful.canBeSet()) {
logData->recoverySuccessful.send(false);
}
// Lock once the current version has been committed
Void _ = wait( logData->queueCommittedVersion.whenAtLeast( stopVersion ) );
@ -1135,12 +1138,22 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
}
}
ACTOR Future<Void> respondToRecovered( TLogInterface tli, Future<Void> recovery ) {
Void _ = wait( recovery );
ACTOR Future<Void> respondToRecovered( TLogInterface tli, Promise<bool> recoverySuccessful, Future<Void> recovery ) {
Void _ = wait( success(recoverySuccessful.getFuture()) || recovery );
ASSERT(recoverySuccessful.isSet());
state bool finishedRecovery = recoverySuccessful.getFuture().get();
if(!finishedRecovery) {
recovery = Void();
}
loop {
TLogRecoveryFinishedRequest req = waitNext( tli.recoveryFinished.getFuture() );
req.reply.send(Void());
if(finishedRecovery) {
req.reply.send(Void());
} else {
req.reply.send(Never());
}
}
}
@ -1216,7 +1229,9 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
void removeLog( TLogData* self, Reference<LogData> logData ) {
TraceEvent("TLogRemoved", logData->logId).detail("input", logData->bytesInput.getValue()).detail("durable", logData->bytesDurable.getValue());
logData->stopped = true;
logData->recovery = Void();
if(logData->recoverySuccessful.canBeSet()) {
logData->recoverySuccessful.send(false);
}
logData->addActor = PromiseStream<Future<Void>>(); //there could be items still in the promise stream if one of the actors threw an error immediately
self->id_data.erase(logData->logId);
@ -1249,12 +1264,8 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData ) {
state Future<Void> warningCollector = timeoutWarningCollector( warningCollectorInput.getFuture(), 1.0, "TLogQueueCommitSlow", self->dbgid );
state Future<Void> error = actorCollection( logData->addActor.getFuture() );
if( logData->recovery.isValid() && !logData->recovery.isReady()) {
logData->addActor.send( logData->recovery );
}
logData->addActor.send( logData->recovery );
logData->addActor.send( waitFailureServer(logData->tli.waitFailure.getFuture()) );
logData->addActor.send( respondToRecovered(logData->tli, logData->recovery) );
logData->addActor.send( logData->removed );
//FIXME: update tlogMetrics to include new information, or possibly only have one copy for the shared instance
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
@ -1655,6 +1666,9 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
TraceEvent("TLogRecoveryComplete", logData->logId).detail("Locality", self->dbInfo->get().myLocality.toString());
TEST(true); // tLog restore from old log system completed
if(logData->recoverySuccessful.canBeSet()) {
logData->recoverySuccessful.send(true);
}
return Void();
} catch( Error &e ) {
TraceEvent("TLogRecoveryError", logData->logId).error(e,true);
@ -1678,7 +1692,9 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
for(auto it : self->id_data) {
it.second->stopped = true;
it.second->recovery = Void();
if(it.second->recoverySuccessful.canBeSet()) {
it.second->recoverySuccessful.send(false);
}
}
state Reference<LogData> logData = Reference<LogData>( new LogData(self, recruited) );
@ -1706,7 +1722,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
state Promise<Void> copyComplete;
TraceEvent("TLogRecover", self->dbgid).detail("logId", logData->logId).detail("at", req.recoverAt).detail("known", req.knownCommittedVersion).detail("tags", describe(req.recoverTags));
logData->recovery = recoverFromLogSystem( self, logData, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete );
logData->recovery = respondToRecovered( recruited, logData->recoverySuccessful, recoverFromLogSystem( self, logData, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete ) );
Void _ = wait(copyComplete.getFuture() || logData->removed );
} else {
// Brand new tlog, initialization has already been done by caller
@ -1778,7 +1794,9 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
for( auto& it : self.id_data ) {
it.second->recovery = Void();
if(it.second->recoverySuccessful.canBeSet()) {
it.second->recoverySuccessful.send(false);
}
}
if (tlogTerminated( &self, persistentData, self.persistentQueue, e )) {