diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 0f12e5e3af..c5b056a177 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1187,7 +1187,7 @@ ACTOR Future> getValue( Future version, Key key, Databa } ACTOR Future getKey( Database cx, KeySelector k, Future version, TransactionInfo info ) { - state Version ver = wait(version); + Version ver = wait(version); if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.AfterVersion"); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 0ded9c2d81..12803d7ad3 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1784,7 +1784,7 @@ ACTOR Future recoverTagFromLogSystem( TLogData* self, Reference l return Void(); } -ACTOR Future updateLogSystem(TLogData* self, Reference logData, LogSystemConfig recoverFrom, Reference>> logSystem) { +ACTOR Future updateLogSystem(TLogData* self, Reference logData, Optional syncLogId, LogSystemConfig recoverFrom, Reference>> logSystem) { loop { TraceEvent("TLogUpdate", self->dbgid).detail("logId", logData->logId).detail("recoverFrom", recoverFrom.toString()).detail("dbInfo", self->dbInfo->get().logSystemConfig.toString()); for(auto it : self->dbInfo->get().logSystemConfig.oldTLogs) { @@ -1797,7 +1797,7 @@ ACTOR Future updateLogSystem(TLogData* self, Reference logData, L found = true; } else if( self->dbInfo->get().logSystemConfig.isNextGenerationOf(recoverFrom) ) { for( auto& it : self->dbInfo->get().logSystemConfig.tLogs ) { - if( std::count(it.tLogs.begin(), it.tLogs.end(), logData->logId ) ) { + if( std::count(it.tLogs.begin(), it.tLogs.end(), syncLogId.present() ? syncLogId.get() : logData->logId ) ) { logSystem->set(ILogSystem::fromOldLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig )); found = true; break; @@ -1811,13 +1811,13 @@ ACTOR Future updateLogSystem(TLogData* self, Reference logData, L } } -ACTOR Future recoverFromLogSystem( TLogData* self, Reference logData, LogSystemConfig recoverFrom, Version recoverAt, Version knownCommittedVersion, std::vector recoverTags, Promise copyComplete ) { +ACTOR Future recoverFromLogSystem( TLogData* self, Reference logData, Optional syncLogId, LogSystemConfig recoverFrom, Version recoverAt, Version knownCommittedVersion, std::vector recoverTags, Promise copyComplete ) { state Future committing = Void(); state double lastCommitT = now(); state Reference> uncommittedBytes = Reference>(new AsyncVar()); state std::vector> recoverFutures; state Reference>> logSystem = Reference>>(new AsyncVar>()); - state Future updater = updateLogSystem(self, logData, recoverFrom, logSystem); + state Future updater = updateLogSystem(self, logData, syncLogId, recoverFrom, logSystem); for(auto tag : recoverTags ) recoverFutures.push_back(recoverTagFromLogSystem(self, logData, knownCommittedVersion, recoverAt, tag, uncommittedBytes, logSystem)); @@ -1940,7 +1940,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit } logData->recoveringBefore = req.knownCommittedVersion; - logData->recovery = recoverFromLogSystem( self, logData, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete ); + logData->recovery = recoverFromLogSystem( self, logData, req.syncLogId, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete ); Void _ = wait(copyComplete.getFuture() || logData->removed ); } else { // Brand new tlog, initialization has already been done by caller diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index f316dce5cb..a5d4b28d02 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -957,6 +957,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted newRemoteEpoch( TagPartitionedLogSystem* self, Reference oldLogSystem, Future fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, uint16_t minTag, int8_t remoteLocality ) { + TraceEvent("RemoteLogRecruitment_WaitingForWorkers"); state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers ); state Reference logSet = Reference( new LogSet() ); @@ -977,6 +978,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedTLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); } + TraceEvent("RemoteLogRecruitment_RecruitingLogRouters"); Void _ = wait( waitForAll(logRouterInitializationReplies) ); for( int i = 0; i < logRouterInitializationReplies.size(); i++ ) { @@ -993,6 +995,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[0]->logServers[0]->get().id(); req.storeType = configuration.tLogDataStoreType; req.recoverFrom = oldLogSystem->getLogSystemConfig(); req.recoverAt = oldLogSystem->epochEndVersion.get(); @@ -1017,6 +1020,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedTLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + TraceEvent("RemoteLogRecruitment_InitializingRemoteLogs"); Void _ = wait( waitForAll(remoteTLogInitializationReplies) ); for( int i = 0; i < remoteTLogInitializationReplies.size(); i++ ) { @@ -1031,7 +1035,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedremoteRecoveryComplete = waitForAll(recoveryComplete); logSet->logRouters.resize(remoteWorkers.remoteTLogs.size()); self->tLogs.push_back( logSet ); - + TraceEvent("RemoteLogRecruitment_CompletingRecovery"); return Void(); } diff --git a/fdbserver/WorkerInterface.h b/fdbserver/WorkerInterface.h index 9726b43d08..6d3cd4b3a3 100644 --- a/fdbserver/WorkerInterface.h +++ b/fdbserver/WorkerInterface.h @@ -69,6 +69,7 @@ struct WorkerInterface { struct InitializeTLogRequest { UID recruitmentID; + Optional syncLogId; LogSystemConfig recoverFrom; Version recoverAt; Version knownCommittedVersion; @@ -82,7 +83,7 @@ struct InitializeTLogRequest { template void serialize( Ar& ar ) { - ar & recruitmentID & recoverFrom & recoverAt & knownCommittedVersion & epoch & recoverTags & storeType & remoteTag & reply; + ar & recruitmentID & syncLogId & recoverFrom & recoverAt & knownCommittedVersion & epoch & recoverTags & storeType & remoteTag & reply; } };