fix: remote logs are not in the log system until the recovery is complete so they cannot be used to determine if this is the correct log system to recover from
This commit is contained in:
parent
2a869a4178
commit
63751fb0e2
|
@ -1187,7 +1187,7 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
}
|
||||
|
||||
ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, TransactionInfo info ) {
|
||||
state Version ver = wait(version);
|
||||
Version ver = wait(version);
|
||||
|
||||
if( info.debugID.present() )
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.AfterVersion");
|
||||
|
|
|
@ -1784,7 +1784,7 @@ ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> l
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, LogSystemConfig recoverFrom, Reference<AsyncVar<Reference<ILogSystem>>> logSystem) {
|
||||
ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, Optional<UID> syncLogId, LogSystemConfig recoverFrom, Reference<AsyncVar<Reference<ILogSystem>>> logSystem) {
|
||||
loop {
|
||||
TraceEvent("TLogUpdate", self->dbgid).detail("logId", logData->logId).detail("recoverFrom", recoverFrom.toString()).detail("dbInfo", self->dbInfo->get().logSystemConfig.toString());
|
||||
for(auto it : self->dbInfo->get().logSystemConfig.oldTLogs) {
|
||||
|
@ -1797,7 +1797,7 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
|
|||
found = true;
|
||||
} else if( self->dbInfo->get().logSystemConfig.isNextGenerationOf(recoverFrom) ) {
|
||||
for( auto& it : self->dbInfo->get().logSystemConfig.tLogs ) {
|
||||
if( std::count(it.tLogs.begin(), it.tLogs.end(), logData->logId ) ) {
|
||||
if( std::count(it.tLogs.begin(), it.tLogs.end(), syncLogId.present() ? syncLogId.get() : logData->logId ) ) {
|
||||
logSystem->set(ILogSystem::fromOldLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig ));
|
||||
found = true;
|
||||
break;
|
||||
|
@ -1811,13 +1811,13 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logData, LogSystemConfig recoverFrom, Version recoverAt, Version knownCommittedVersion, std::vector<Tag> recoverTags, Promise<Void> copyComplete ) {
|
||||
ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logData, Optional<UID> syncLogId, LogSystemConfig recoverFrom, Version recoverAt, Version knownCommittedVersion, std::vector<Tag> recoverTags, Promise<Void> copyComplete ) {
|
||||
state Future<Void> committing = Void();
|
||||
state double lastCommitT = now();
|
||||
state Reference<AsyncVar<int>> uncommittedBytes = Reference<AsyncVar<int>>(new AsyncVar<int>());
|
||||
state std::vector<Future<Void>> recoverFutures;
|
||||
state Reference<AsyncVar<Reference<ILogSystem>>> logSystem = Reference<AsyncVar<Reference<ILogSystem>>>(new AsyncVar<Reference<ILogSystem>>());
|
||||
state Future<Void> updater = updateLogSystem(self, logData, recoverFrom, logSystem);
|
||||
state Future<Void> updater = updateLogSystem(self, logData, syncLogId, recoverFrom, logSystem);
|
||||
|
||||
for(auto tag : recoverTags )
|
||||
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, knownCommittedVersion, recoverAt, tag, uncommittedBytes, logSystem));
|
||||
|
@ -1940,7 +1940,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
}
|
||||
|
||||
logData->recoveringBefore = req.knownCommittedVersion;
|
||||
logData->recovery = recoverFromLogSystem( self, logData, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete );
|
||||
logData->recovery = recoverFromLogSystem( self, logData, req.syncLogId, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete );
|
||||
Void _ = wait(copyComplete.getFuture() || logData->removed );
|
||||
} else {
|
||||
// Brand new tlog, initialization has already been done by caller
|
||||
|
|
|
@ -957,6 +957,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, uint16_t minTag, int8_t remoteLocality )
|
||||
{
|
||||
TraceEvent("RemoteLogRecruitment_WaitingForWorkers");
|
||||
state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers );
|
||||
|
||||
state Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
|
||||
|
@ -977,6 +978,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i%remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
}
|
||||
|
||||
TraceEvent("RemoteLogRecruitment_RecruitingLogRouters");
|
||||
Void _ = wait( waitForAll(logRouterInitializationReplies) );
|
||||
|
||||
for( int i = 0; i < logRouterInitializationReplies.size(); i++ ) {
|
||||
|
@ -993,6 +995,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) {
|
||||
InitializeTLogRequest &req = remoteTLogReqs[i];
|
||||
req.recruitmentID = remoteRecruitmentID;
|
||||
req.syncLogId = self->tLogs[0]->logServers[0]->get().id();
|
||||
req.storeType = configuration.tLogDataStoreType;
|
||||
req.recoverFrom = oldLogSystem->getLogSystemConfig();
|
||||
req.recoverAt = oldLogSystem->epochEndVersion.get();
|
||||
|
@ -1017,6 +1020,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ )
|
||||
remoteTLogInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor( remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
|
||||
TraceEvent("RemoteLogRecruitment_InitializingRemoteLogs");
|
||||
Void _ = wait( waitForAll(remoteTLogInitializationReplies) );
|
||||
|
||||
for( int i = 0; i < remoteTLogInitializationReplies.size(); i++ ) {
|
||||
|
@ -1031,7 +1035,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
self->remoteRecoveryComplete = waitForAll(recoveryComplete);
|
||||
logSet->logRouters.resize(remoteWorkers.remoteTLogs.size());
|
||||
self->tLogs.push_back( logSet );
|
||||
|
||||
TraceEvent("RemoteLogRecruitment_CompletingRecovery");
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@ struct WorkerInterface {
|
|||
|
||||
struct InitializeTLogRequest {
|
||||
UID recruitmentID;
|
||||
Optional<UID> syncLogId;
|
||||
LogSystemConfig recoverFrom;
|
||||
Version recoverAt;
|
||||
Version knownCommittedVersion;
|
||||
|
@ -82,7 +83,7 @@ struct InitializeTLogRequest {
|
|||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & recruitmentID & recoverFrom & recoverAt & knownCommittedVersion & epoch & recoverTags & storeType & remoteTag & reply;
|
||||
ar & recruitmentID & syncLogId & recoverFrom & recoverAt & knownCommittedVersion & epoch & recoverTags & storeType & remoteTag & reply;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue