2017-05-26 04:48:44 +08:00
/*
* TagPartitionedLogSystem . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# include "flow/actorcompiler.h"
# include "flow/ActorCollection.h"
# include "LogSystem.h"
# include "ServerDBInfo.h"
# include "DBCoreState.h"
# include "WaitFailure.h"
# include "fdbclient/SystemData.h"
# include "fdbrpc/simulator.h"
# include "fdbrpc/Replication.h"
# include "fdbrpc/ReplicationUtils.h"
2017-10-25 07:28:50 +08:00
# include "RecoveryState.h"
2017-05-26 04:48:44 +08:00
ACTOR static Future < Void > reportTLogCommitErrors ( Future < Void > commitReply , UID debugID ) {
try {
Void _ = wait ( commitReply ) ;
return Void ( ) ;
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_broken_promise )
throw master_tlog_failed ( ) ;
else if ( e . code ( ) ! = error_code_actor_cancelled & & e . code ( ) ! = error_code_tlog_stopped )
TraceEvent ( SevError , " MasterTLogCommitRequestError " , debugID ) . error ( e ) ;
throw ;
}
}
2017-07-10 05:46:16 +08:00
struct OldLogData {
2017-07-12 06:48:10 +08:00
std : : vector < Reference < LogSet > > tLogs ;
2018-04-09 12:24:05 +08:00
int32_t logRouterTags ;
2017-05-26 04:48:44 +08:00
Version epochEnd ;
2018-04-09 12:24:05 +08:00
OldLogData ( ) : epochEnd ( 0 ) , logRouterTags ( 0 ) { }
2017-05-26 04:48:44 +08:00
} ;
2018-03-30 06:12:38 +08:00
struct LogLockInfo {
2018-04-22 07:03:28 +08:00
Version epochEnd ;
2018-04-23 02:14:13 +08:00
bool isCurrent ;
2018-04-09 12:24:05 +08:00
Reference < LogSet > logSet ;
2018-03-30 06:12:38 +08:00
std : : vector < Future < TLogLockResult > > replies ;
2018-04-22 07:03:28 +08:00
2018-04-23 02:14:13 +08:00
LogLockInfo ( ) : epochEnd ( std : : numeric_limits < Version > : : max ( ) ) , isCurrent ( false ) { }
2018-03-30 06:12:38 +08:00
} ;
2017-05-26 04:48:44 +08:00
struct TagPartitionedLogSystem : ILogSystem , ReferenceCounted < TagPartitionedLogSystem > {
UID dbgid ;
2017-07-10 05:46:16 +08:00
int logSystemType ;
2017-07-12 06:48:10 +08:00
std : : vector < Reference < LogSet > > tLogs ;
2017-09-08 06:32:08 +08:00
int expectedLogSets ;
2018-04-09 12:24:05 +08:00
int logRouterTags ;
2018-04-21 04:25:22 +08:00
UID recruitmentID ;
2017-05-26 04:48:44 +08:00
// new members
Future < Void > rejoins ;
Future < Void > recoveryComplete ;
2017-07-10 05:46:16 +08:00
Future < Void > remoteRecovery ;
2017-07-14 03:29:21 +08:00
Future < Void > remoteRecoveryComplete ;
2018-03-30 06:12:38 +08:00
std : : vector < LogLockInfo > lockResults ;
2017-05-26 04:48:44 +08:00
bool recoveryCompleteWrittenToCoreState ;
2017-09-08 06:32:08 +08:00
bool remoteLogsWrittenToCoreState ;
2018-02-03 03:46:04 +08:00
bool hasRemoteServers ;
2017-05-26 04:48:44 +08:00
Optional < Version > epochEndVersion ;
2018-04-01 07:47:56 +08:00
std : : set < Tag > epochEndTags ;
2017-05-26 04:48:44 +08:00
Version knownCommittedVersion ;
LocalityData locality ;
2018-04-19 03:07:29 +08:00
std : : map < std : : pair < UID , Tag > , std : : pair < Version , Version > > outstandingPops ; // For each currently running popFromLog actor, (log server #, tag)->popped version
2017-05-26 04:48:44 +08:00
ActorCollection actors ;
std : : vector < OldLogData > oldLogData ;
2018-04-22 03:57:00 +08:00
AsyncTrigger logSystemConfigChanged ;
2017-05-26 04:48:44 +08:00
2018-04-09 12:24:05 +08:00
TagPartitionedLogSystem ( UID dbgid , LocalityData locality ) : dbgid ( dbgid ) , locality ( locality ) , actors ( false ) , recoveryCompleteWrittenToCoreState ( false ) , remoteLogsWrittenToCoreState ( false ) , logSystemType ( 0 ) , logRouterTags ( 0 ) , expectedLogSets ( 0 ) , hasRemoteServers ( false ) { }
2017-05-26 04:48:44 +08:00
virtual void stopRejoins ( ) {
rejoins = Future < Void > ( ) ;
}
virtual void addref ( ) {
ReferenceCounted < TagPartitionedLogSystem > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < TagPartitionedLogSystem > : : delref ( ) ;
}
virtual std : : string describe ( ) {
std : : string result ;
2017-07-10 05:46:16 +08:00
for ( int i = 0 ; i < tLogs . size ( ) ; i + + ) {
result = format ( " %d: " , i ) ;
2017-07-12 06:48:10 +08:00
for ( int j = 0 ; j < tLogs [ i ] - > logServers . size ( ) ; j + + ) {
result = result + tLogs [ i ] - > logServers [ j ] - > get ( ) . id ( ) . toString ( ) + ( ( j = = tLogs [ i ] - > logServers . size ( ) - 1 ) ? " " : " , " ) ;
2017-07-10 05:46:16 +08:00
}
2017-05-26 04:48:44 +08:00
}
return result ;
}
virtual UID getDebugID ( ) {
return dbgid ;
}
static Future < Void > recoverAndEndEpoch ( Reference < AsyncVar < Reference < ILogSystem > > > const & outLogSystem , UID const & dbgid , DBCoreState const & oldState , FutureStream < TLogRejoinRequest > const & rejoins , LocalityData const & locality ) {
return epochEnd ( outLogSystem , dbgid , oldState , rejoins , locality ) ;
}
2018-02-22 06:06:44 +08:00
static Reference < ILogSystem > fromLogSystemConfig ( UID const & dbgid , LocalityData const & locality , LogSystemConfig const & lsConf , bool excludeRemote ) {
2017-05-26 04:48:44 +08:00
ASSERT ( lsConf . logSystemType = = 2 | | ( lsConf . logSystemType = = 0 & & ! lsConf . tLogs . size ( ) ) ) ;
//ASSERT(lsConf.epoch == epoch); //< FIXME
Reference < TagPartitionedLogSystem > logSystem ( new TagPartitionedLogSystem ( dbgid , locality ) ) ;
2018-02-22 07:28:02 +08:00
logSystem - > tLogs . reserve ( lsConf . tLogs . size ( ) ) ;
2017-09-08 06:32:08 +08:00
logSystem - > expectedLogSets = lsConf . expectedLogSets ;
2018-04-09 12:24:05 +08:00
logSystem - > logRouterTags = lsConf . logRouterTags ;
2018-04-21 04:25:22 +08:00
logSystem - > recruitmentID = lsConf . recruitmentID ;
2017-07-12 06:48:10 +08:00
for ( int i = 0 ; i < lsConf . tLogs . size ( ) ; i + + ) {
TLogSet const & tLogSet = lsConf . tLogs [ i ] ;
2018-02-22 06:06:44 +08:00
if ( ! excludeRemote | | tLogSet . isLocal ) {
Reference < LogSet > logSet = Reference < LogSet > ( new LogSet ( ) ) ;
2018-02-22 07:28:02 +08:00
logSystem - > tLogs . push_back ( logSet ) ;
2018-02-22 06:06:44 +08:00
for ( auto & log : tLogSet . tLogs ) {
logSet - > logServers . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( log ) ) ) ;
}
for ( auto & log : tLogSet . logRouters ) {
logSet - > logRouters . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( log ) ) ) ;
}
logSet - > tLogWriteAntiQuorum = tLogSet . tLogWriteAntiQuorum ;
logSet - > tLogReplicationFactor = tLogSet . tLogReplicationFactor ;
logSet - > tLogPolicy = tLogSet . tLogPolicy ;
logSet - > tLogLocalities = tLogSet . tLogLocalities ;
logSet - > isLocal = tLogSet . isLocal ;
logSet - > hasBestPolicy = tLogSet . hasBestPolicy ;
logSet - > locality = tLogSet . locality ;
2018-03-30 06:12:38 +08:00
logSet - > startVersion = tLogSet . startVersion ;
2018-02-22 06:06:44 +08:00
logSet - > updateLocalitySet ( ) ;
filterLocalityDataForPolicy ( logSet - > tLogPolicy , & logSet - > tLogLocalities ) ;
2017-07-10 05:46:16 +08:00
}
}
2017-05-26 04:48:44 +08:00
logSystem - > oldLogData . resize ( lsConf . oldTLogs . size ( ) ) ;
for ( int i = 0 ; i < lsConf . oldTLogs . size ( ) ; i + + ) {
2017-07-12 06:48:10 +08:00
logSystem - > oldLogData [ i ] . tLogs . resize ( lsConf . oldTLogs [ i ] . tLogs . size ( ) ) ;
for ( int j = 0 ; j < lsConf . oldTLogs [ i ] . tLogs . size ( ) ; j + + ) {
Reference < LogSet > logSet = Reference < LogSet > ( new LogSet ( ) ) ;
logSystem - > oldLogData [ i ] . tLogs [ j ] = logSet ;
TLogSet const & tLogData = lsConf . oldTLogs [ i ] . tLogs [ j ] ;
for ( auto & log : tLogData . tLogs ) {
logSet - > logServers . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( log ) ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-07-12 06:48:10 +08:00
for ( auto & log : tLogData . logRouters ) {
logSet - > logRouters . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( log ) ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-07-12 06:48:10 +08:00
logSet - > tLogWriteAntiQuorum = tLogData . tLogWriteAntiQuorum ;
logSet - > tLogReplicationFactor = tLogData . tLogReplicationFactor ;
logSet - > tLogPolicy = tLogData . tLogPolicy ;
logSet - > tLogLocalities = tLogData . tLogLocalities ;
logSet - > isLocal = tLogData . isLocal ;
2018-01-30 09:48:18 +08:00
logSet - > hasBestPolicy = tLogData . hasBestPolicy ;
2017-09-08 06:32:08 +08:00
logSet - > locality = tLogData . locality ;
2018-03-30 06:12:38 +08:00
logSet - > startVersion = tLogData . startVersion ;
2017-07-10 05:46:16 +08:00
//logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs
2017-05-26 04:48:44 +08:00
}
2018-04-09 12:24:05 +08:00
logSystem - > oldLogData [ i ] . logRouterTags = lsConf . oldTLogs [ i ] . logRouterTags ;
2017-05-26 04:48:44 +08:00
logSystem - > oldLogData [ i ] . epochEnd = lsConf . oldTLogs [ i ] . epochEnd ;
}
logSystem - > logSystemType = lsConf . logSystemType ;
return logSystem ;
}
static Reference < ILogSystem > fromOldLogSystemConfig ( UID const & dbgid , LocalityData const & locality , LogSystemConfig const & lsConf ) {
ASSERT ( lsConf . logSystemType = = 2 | | ( lsConf . logSystemType = = 0 & & ! lsConf . tLogs . size ( ) ) ) ;
//ASSERT(lsConf.epoch == epoch); //< FIXME
Reference < TagPartitionedLogSystem > logSystem ( new TagPartitionedLogSystem ( dbgid , locality ) ) ;
if ( lsConf . oldTLogs . size ( ) ) {
2017-07-12 06:48:10 +08:00
logSystem - > tLogs . resize ( lsConf . oldTLogs [ 0 ] . tLogs . size ( ) ) ;
for ( int i = 0 ; i < lsConf . oldTLogs [ 0 ] . tLogs . size ( ) ; i + + ) {
Reference < LogSet > logSet = Reference < LogSet > ( new LogSet ( ) ) ;
logSystem - > tLogs [ i ] = logSet ;
TLogSet const & tLogSet = lsConf . oldTLogs [ 0 ] . tLogs [ i ] ;
for ( auto & log : tLogSet . tLogs ) {
logSet - > logServers . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( log ) ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-07-12 06:48:10 +08:00
for ( auto & log : tLogSet . logRouters ) {
logSet - > logRouters . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( log ) ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-07-12 06:48:10 +08:00
logSet - > tLogWriteAntiQuorum = tLogSet . tLogWriteAntiQuorum ;
logSet - > tLogReplicationFactor = tLogSet . tLogReplicationFactor ;
logSet - > tLogPolicy = tLogSet . tLogPolicy ;
logSet - > tLogLocalities = tLogSet . tLogLocalities ;
logSet - > isLocal = tLogSet . isLocal ;
2018-01-30 09:48:18 +08:00
logSet - > hasBestPolicy = tLogSet . hasBestPolicy ;
2017-09-08 06:32:08 +08:00
logSet - > locality = tLogSet . locality ;
2018-03-30 06:12:38 +08:00
logSet - > startVersion = tLogSet . startVersion ;
2017-07-12 06:48:10 +08:00
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
2017-07-10 05:46:16 +08:00
}
2018-04-09 12:24:05 +08:00
logSystem - > logRouterTags = lsConf . oldTLogs [ 0 ] . logRouterTags ;
2017-07-10 05:46:16 +08:00
//logSystem->epochEnd = lsConf.oldTLogs[0].epochEnd;
2017-05-26 04:48:44 +08:00
logSystem - > oldLogData . resize ( lsConf . oldTLogs . size ( ) - 1 ) ;
for ( int i = 1 ; i < lsConf . oldTLogs . size ( ) ; i + + ) {
2017-07-12 06:48:10 +08:00
logSystem - > oldLogData [ i - 1 ] . tLogs . resize ( lsConf . oldTLogs [ i ] . tLogs . size ( ) ) ;
for ( int j = 0 ; j < lsConf . oldTLogs [ i ] . tLogs . size ( ) ; j + + ) {
Reference < LogSet > logSet = Reference < LogSet > ( new LogSet ( ) ) ;
logSystem - > oldLogData [ i - 1 ] . tLogs [ j ] = logSet ;
TLogSet const & tLogSet = lsConf . oldTLogs [ i ] . tLogs [ j ] ;
for ( auto & log : tLogSet . tLogs ) {
logSet - > logServers . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( log ) ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-07-12 06:48:10 +08:00
for ( auto & log : tLogSet . logRouters ) {
logSet - > logRouters . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( log ) ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-07-12 06:48:10 +08:00
logSet - > tLogWriteAntiQuorum = tLogSet . tLogWriteAntiQuorum ;
logSet - > tLogReplicationFactor = tLogSet . tLogReplicationFactor ;
logSet - > tLogPolicy = tLogSet . tLogPolicy ;
logSet - > tLogLocalities = tLogSet . tLogLocalities ;
logSet - > isLocal = tLogSet . isLocal ;
2018-01-30 09:48:18 +08:00
logSet - > hasBestPolicy = tLogSet . hasBestPolicy ;
2017-09-08 06:32:08 +08:00
logSet - > locality = tLogSet . locality ;
2018-03-30 06:12:38 +08:00
logSet - > startVersion = tLogSet . startVersion ;
2017-07-12 06:48:10 +08:00
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
2017-05-26 04:48:44 +08:00
}
2018-04-09 12:24:05 +08:00
logSystem - > oldLogData [ i - 1 ] . logRouterTags = lsConf . oldTLogs [ i ] . logRouterTags ;
2017-05-26 04:48:44 +08:00
logSystem - > oldLogData [ i - 1 ] . epochEnd = lsConf . oldTLogs [ i ] . epochEnd ;
}
}
logSystem - > logSystemType = lsConf . logSystemType ;
return logSystem ;
}
virtual void toCoreState ( DBCoreState & newState ) {
if ( recoveryComplete . isValid ( ) & & recoveryComplete . isError ( ) )
throw recoveryComplete . getError ( ) ;
2018-04-09 12:24:05 +08:00
if ( remoteRecoveryComplete . isValid ( ) & & remoteRecoveryComplete . isError ( ) )
throw remoteRecoveryComplete . getError ( ) ;
2017-05-26 04:48:44 +08:00
newState . tLogs . clear ( ) ;
2018-04-09 12:24:05 +08:00
newState . logRouterTags = logRouterTags ;
2017-07-10 05:46:16 +08:00
for ( auto & t : tLogs ) {
2018-04-09 12:24:05 +08:00
if ( t - > logServers . size ( ) ) {
CoreTLogSet coreSet ;
for ( auto & log : t - > logServers ) {
coreSet . tLogs . push_back ( log - > get ( ) . id ( ) ) ;
coreSet . tLogLocalities . push_back ( log - > get ( ) . interf ( ) . locality ) ;
}
coreSet . tLogWriteAntiQuorum = t - > tLogWriteAntiQuorum ;
coreSet . tLogReplicationFactor = t - > tLogReplicationFactor ;
coreSet . tLogPolicy = t - > tLogPolicy ;
coreSet . isLocal = t - > isLocal ;
coreSet . hasBestPolicy = t - > hasBestPolicy ;
coreSet . locality = t - > locality ;
coreSet . startVersion = t - > startVersion ;
newState . tLogs . push_back ( coreSet ) ;
}
2017-05-26 04:48:44 +08:00
}
newState . oldTLogData . clear ( ) ;
2018-04-09 12:24:05 +08:00
if ( ! recoveryComplete . isValid ( ) | | ! recoveryComplete . isReady ( ) | | ! remoteRecoveryComplete . isValid ( ) | | ! remoteRecoveryComplete . isReady ( ) ) {
2017-05-26 04:48:44 +08:00
newState . oldTLogData . resize ( oldLogData . size ( ) ) ;
for ( int i = 0 ; i < oldLogData . size ( ) ; i + + ) {
2017-07-10 05:46:16 +08:00
for ( auto & t : oldLogData [ i ] . tLogs ) {
2018-04-09 12:24:05 +08:00
if ( t - > logServers . size ( ) ) {
CoreTLogSet coreSet ;
for ( auto & log : t - > logServers ) {
coreSet . tLogs . push_back ( log - > get ( ) . id ( ) ) ;
}
coreSet . tLogLocalities = t - > tLogLocalities ;
coreSet . tLogWriteAntiQuorum = t - > tLogWriteAntiQuorum ;
coreSet . tLogReplicationFactor = t - > tLogReplicationFactor ;
coreSet . tLogPolicy = t - > tLogPolicy ;
coreSet . isLocal = t - > isLocal ;
coreSet . hasBestPolicy = t - > hasBestPolicy ;
coreSet . locality = t - > locality ;
coreSet . startVersion = t - > startVersion ;
newState . oldTLogData [ i ] . tLogs . push_back ( coreSet ) ;
2017-07-10 05:46:16 +08:00
}
}
2018-04-09 12:24:05 +08:00
newState . oldTLogData [ i ] . logRouterTags = oldLogData [ i ] . logRouterTags ;
2017-05-26 04:48:44 +08:00
newState . oldTLogData [ i ] . epochEnd = oldLogData [ i ] . epochEnd ;
}
}
newState . logSystemType = logSystemType ;
}
virtual Future < Void > onCoreStateChanged ( ) {
2017-09-08 06:32:08 +08:00
ASSERT ( recoveryComplete . isValid ( ) & & remoteRecovery . isValid ( ) ) ;
if ( recoveryComplete . isReady ( ) & & remoteRecovery . isReady ( ) ) {
2018-04-09 12:24:05 +08:00
if ( ! remoteRecoveryComplete . isReady ( ) ) {
return remoteRecoveryComplete ;
}
2017-05-26 04:48:44 +08:00
return Never ( ) ;
2017-09-08 06:32:08 +08:00
}
if ( remoteRecovery . isReady ( ) ) {
return recoveryComplete ;
}
if ( recoveryComplete . isReady ( ) ) {
return remoteRecovery ;
}
return recoveryComplete | | remoteRecovery ;
2017-05-26 04:48:44 +08:00
}
virtual void coreStateWritten ( DBCoreState const & newState ) {
2017-09-08 06:32:08 +08:00
if ( ! newState . oldTLogData . size ( ) ) {
2017-05-26 04:48:44 +08:00
recoveryCompleteWrittenToCoreState = true ;
2017-09-08 06:32:08 +08:00
}
for ( auto & t : newState . tLogs ) {
if ( ! t . isLocal ) {
remoteLogsWrittenToCoreState = true ;
break ;
}
}
2017-05-26 04:48:44 +08:00
}
virtual Future < Void > onError ( ) {
2018-02-03 03:46:04 +08:00
return onError_internal ( this ) ;
}
2017-05-26 04:48:44 +08:00
2018-02-03 03:46:04 +08:00
ACTOR static Future < Void > onError_internal ( TagPartitionedLogSystem * self ) {
// Never returns normally, but throws an error if the subsystem stops working
loop {
vector < Future < Void > > failed ;
vector < Future < Void > > changes ;
2018-04-09 12:24:05 +08:00
for ( auto & it : self - > tLogs ) {
2018-02-03 03:46:04 +08:00
for ( auto & t : it - > logServers ) {
if ( t - > get ( ) . present ( ) ) {
failed . push_back ( waitFailureClient ( t - > get ( ) . interf ( ) . waitFailure , SERVER_KNOBS - > TLOG_TIMEOUT , - SERVER_KNOBS - > TLOG_TIMEOUT / SERVER_KNOBS - > SECONDS_BEFORE_NO_FAILURE_DELAY ) ) ;
} else {
changes . push_back ( t - > onChange ( ) ) ;
}
2017-07-10 05:46:16 +08:00
}
2018-02-03 03:46:04 +08:00
for ( auto & t : it - > logRouters ) {
if ( t - > get ( ) . present ( ) ) {
failed . push_back ( waitFailureClient ( t - > get ( ) . interf ( ) . waitFailure , SERVER_KNOBS - > TLOG_TIMEOUT , - SERVER_KNOBS - > TLOG_TIMEOUT / SERVER_KNOBS - > SECONDS_BEFORE_NO_FAILURE_DELAY ) ) ;
} else {
changes . push_back ( t - > onChange ( ) ) ;
}
2017-07-10 05:46:16 +08:00
}
}
2018-04-09 12:24:05 +08:00
for ( auto & old : self - > oldLogData ) {
for ( auto & it : old . tLogs ) {
for ( auto & t : it - > logRouters ) {
if ( t - > get ( ) . present ( ) ) {
failed . push_back ( waitFailureClient ( t - > get ( ) . interf ( ) . waitFailure , SERVER_KNOBS - > TLOG_TIMEOUT , - SERVER_KNOBS - > TLOG_TIMEOUT / SERVER_KNOBS - > SECONDS_BEFORE_NO_FAILURE_DELAY ) ) ;
} else {
changes . push_back ( t - > onChange ( ) ) ;
}
}
}
}
2017-05-26 04:48:44 +08:00
2018-02-03 03:46:04 +08:00
if ( self - > hasRemoteServers & & ! self - > remoteRecovery . isReady ( ) ) {
changes . push_back ( self - > remoteRecovery ) ;
}
if ( ! changes . size ( ) ) {
changes . push_back ( Never ( ) ) ; //waiting on an empty vector will return immediately
}
ASSERT ( failed . size ( ) > = 1 ) ;
Void _ = wait ( quorum ( changes , 1 ) | | tagError < Void > ( quorum ( failed , 1 ) , master_tlog_failed ( ) ) | | self - > actors . getResult ( ) ) ;
}
2017-05-26 04:48:44 +08:00
}
virtual Future < Void > push ( Version prevVersion , Version version , Version knownCommittedVersion , LogPushData & data , Optional < UID > debugID ) {
// FIXME: Randomize request order as in LegacyLogSystem?
2017-07-10 05:46:16 +08:00
vector < Future < Void > > quorumResults ;
int location = 0 ;
2017-07-12 06:48:10 +08:00
for ( auto & it : tLogs ) {
2018-04-09 12:24:05 +08:00
if ( it - > isLocal & & it - > logServers . size ( ) ) {
2017-07-10 05:46:16 +08:00
vector < Future < Void > > tLogCommitResults ;
2017-07-12 06:48:10 +08:00
for ( int loc = 0 ; loc < it - > logServers . size ( ) ; loc + + ) {
2017-07-10 05:46:16 +08:00
Future < Void > commitMessage = reportTLogCommitErrors (
2017-07-12 06:48:10 +08:00
it - > logServers [ loc ] - > get ( ) . interf ( ) . commit . getReply (
2018-03-17 07:47:05 +08:00
TLogCommitRequest ( data . getArena ( ) , prevVersion , version , knownCommittedVersion , data . getMessages ( location ) , debugID ) , TaskTLogCommitReply ) ,
2017-07-10 05:46:16 +08:00
getDebugID ( ) ) ;
actors . add ( commitMessage ) ;
tLogCommitResults . push_back ( commitMessage ) ;
location + + ;
}
2017-07-12 06:48:10 +08:00
quorumResults . push_back ( quorum ( tLogCommitResults , tLogCommitResults . size ( ) - it - > tLogWriteAntiQuorum ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-05-26 04:48:44 +08:00
}
2017-07-10 05:46:16 +08:00
return waitForAll ( quorumResults ) ;
2017-05-26 04:48:44 +08:00
}
2018-04-21 04:25:22 +08:00
virtual Reference < IPeekCursor > peek ( Version begin , Tag tag , bool parallelGetMore ) {
2018-02-23 08:13:56 +08:00
if ( ! tLogs . size ( ) ) {
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( ) , tag , begin , getPeekEnd ( ) , false , false ) ) ;
}
2017-08-04 07:16:36 +08:00
if ( tag . locality = = tagLocalityRemoteLog ) {
2018-02-23 08:13:56 +08:00
int bestSet = - 1 ;
for ( int t = 0 ; t < tLogs . size ( ) ; t + + ) {
if ( tLogs [ t ] - > logRouters . size ( ) ) {
ASSERT ( bestSet = = - 1 ) ;
bestSet = t ;
}
2017-07-14 03:29:21 +08:00
}
2018-02-23 08:13:56 +08:00
if ( bestSet = = - 1 ) {
2017-07-14 03:29:21 +08:00
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( ) , tag , begin , getPeekEnd ( ) , false , false ) ) ;
}
2018-04-13 09:14:23 +08:00
if ( oldLogData . size ( ) = = 0 | | begin > = tLogs [ 0 ] - > startVersion ) {
2018-03-30 06:12:38 +08:00
return Reference < ILogSystem : : MergedPeekCursor > ( new ILogSystem : : MergedPeekCursor ( tLogs [ bestSet ] - > logRouters , - 1 , ( int ) tLogs [ bestSet ] - > logRouters . size ( ) , tag , begin , getPeekEnd ( ) , false , std : : vector < LocalityData > ( ) , IRepPolicyRef ( ) , 0 ) ) ;
} else {
std : : vector < Reference < ILogSystem : : IPeekCursor > > cursors ;
std : : vector < LogMessageVersion > epochEnds ;
2018-04-13 09:14:23 +08:00
cursors . push_back ( Reference < ILogSystem : : MergedPeekCursor > ( new ILogSystem : : MergedPeekCursor ( tLogs [ bestSet ] - > logRouters , - 1 , ( int ) tLogs [ bestSet ] - > logRouters . size ( ) , tag , tLogs [ 0 ] - > startVersion , getPeekEnd ( ) , false , std : : vector < LocalityData > ( ) , IRepPolicyRef ( ) , 0 ) ) ) ;
Version lastBegin = tLogs [ 0 ] - > startVersion ;
2018-04-09 12:24:05 +08:00
for ( int i = 0 ; i < oldLogData . size ( ) & & begin < lastBegin ; i + + ) {
2018-03-30 06:12:38 +08:00
int bestOldSet = - 1 ;
for ( int t = 0 ; t < oldLogData [ i ] . tLogs . size ( ) ; t + + ) {
if ( oldLogData [ i ] . tLogs [ t ] - > logRouters . size ( ) ) {
2018-04-13 09:14:23 +08:00
ASSERT ( bestOldSet = = - 1 ) ;
2018-03-30 06:12:38 +08:00
bestOldSet = t ;
}
}
if ( bestOldSet = = - 1 ) {
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( ) , tag , begin , getPeekEnd ( ) , false , false ) ) ;
}
2018-04-09 12:24:05 +08:00
2018-04-13 09:14:23 +08:00
Version thisBegin = std : : max ( oldLogData [ i ] . tLogs [ 0 ] - > startVersion , begin ) ;
2018-03-30 06:12:38 +08:00
cursors . push_back ( Reference < ILogSystem : : MergedPeekCursor > ( new ILogSystem : : MergedPeekCursor ( oldLogData [ i ] . tLogs [ bestOldSet ] - > logRouters , - 1 , ( int ) oldLogData [ i ] . tLogs [ bestOldSet ] - > logRouters . size ( ) , tag ,
2018-04-09 12:24:05 +08:00
thisBegin , lastBegin , false , std : : vector < LocalityData > ( ) , IRepPolicyRef ( ) , 0 ) ) ) ;
epochEnds . push_back ( LogMessageVersion ( lastBegin ) ) ;
lastBegin = thisBegin ;
2018-03-30 06:12:38 +08:00
}
return Reference < ILogSystem : : MultiCursor > ( new ILogSystem : : MultiCursor ( cursors , epochEnds ) ) ;
}
2018-02-23 08:13:56 +08:00
} else {
2017-09-08 06:32:08 +08:00
int bestSet = - 1 ;
2018-04-09 12:24:05 +08:00
int nextBestSet = - 1 ;
std : : vector < Reference < LogSet > > localSets ;
for ( auto & log : tLogs ) {
if ( log - > isLocal & & log - > logServers . size ( ) ) {
localSets . push_back ( log ) ;
if ( log - > hasBestPolicy & & ( log - > locality = = tag . locality | | tag . locality = = tagLocalitySpecial | | log - > locality = = tagLocalitySpecial | | log - > locality = = tagLocalityUpgraded ) ) {
bestSet = localSets . size ( ) - 1 ;
nextBestSet = bestSet ;
}
if ( log - > hasBestPolicy & & bestSet = = - 1 ) {
nextBestSet = localSets . size ( ) - 1 ;
}
2017-09-08 06:32:08 +08:00
}
}
2018-04-09 12:24:05 +08:00
2017-07-11 08:41:32 +08:00
if ( oldLogData . size ( ) = = 0 | | begin > = oldLogData [ 0 ] . epochEnd ) {
2018-04-09 12:24:05 +08:00
return Reference < ILogSystem : : SetPeekCursor > ( new ILogSystem : : SetPeekCursor ( localSets , bestSet = = - 1 ? nextBestSet : bestSet ,
bestSet > = 0 ? localSets [ bestSet ] - > bestLocationFor ( tag ) : - 1 , tag , begin , getPeekEnd ( ) , parallelGetMore ) ) ;
2017-07-11 08:41:32 +08:00
} else {
std : : vector < Reference < ILogSystem : : IPeekCursor > > cursors ;
std : : vector < LogMessageVersion > epochEnds ;
2018-04-09 12:24:05 +08:00
cursors . push_back ( Reference < ILogSystem : : SetPeekCursor > ( new ILogSystem : : SetPeekCursor ( localSets , bestSet = = - 1 ? nextBestSet : bestSet ,
bestSet > = 0 ? localSets [ bestSet ] - > bestLocationFor ( tag ) : - 1 , tag , oldLogData [ 0 ] . epochEnd , getPeekEnd ( ) , parallelGetMore ) ) ) ;
2017-07-11 08:41:32 +08:00
for ( int i = 0 ; i < oldLogData . size ( ) & & begin < oldLogData [ i ] . epochEnd ; i + + ) {
2017-09-08 06:32:08 +08:00
int bestOldSet = - 1 ;
2018-04-09 12:24:05 +08:00
int nextBestOldSet = - 1 ;
std : : vector < Reference < LogSet > > localOldSets ;
for ( auto & log : oldLogData [ i ] . tLogs ) {
if ( log - > isLocal & & log - > logServers . size ( ) ) {
localOldSets . push_back ( log ) ;
if ( log - > hasBestPolicy & & ( log - > locality = = tag . locality | | tag . locality = = tagLocalitySpecial | | log - > locality = = tagLocalitySpecial | | log - > locality = = tagLocalityUpgraded ) ) {
bestOldSet = localOldSets . size ( ) - 1 ;
nextBestOldSet = bestOldSet ;
}
if ( log - > hasBestPolicy & & bestOldSet = = - 1 ) {
nextBestOldSet = localOldSets . size ( ) - 1 ;
}
2017-09-08 06:32:08 +08:00
}
}
2018-04-09 12:24:05 +08:00
cursors . push_back ( Reference < ILogSystem : : SetPeekCursor > ( new ILogSystem : : SetPeekCursor ( localOldSets , bestOldSet = = - 1 ? nextBestOldSet : bestOldSet ,
bestOldSet > = 0 ? localOldSets [ bestOldSet ] - > bestLocationFor ( tag ) : - 1 , tag , i + 1 = = oldLogData . size ( ) ? begin : std : : max ( oldLogData [ i + 1 ] . epochEnd , begin ) , oldLogData [ i ] . epochEnd , parallelGetMore ) ) ) ;
2017-07-11 08:41:32 +08:00
epochEnds . push_back ( LogMessageVersion ( oldLogData [ i ] . epochEnd ) ) ;
}
2017-05-26 04:48:44 +08:00
2017-07-11 08:41:32 +08:00
return Reference < ILogSystem : : MultiCursor > ( new ILogSystem : : MultiCursor ( cursors , epochEnds ) ) ;
}
2017-05-26 04:48:44 +08:00
}
}
2018-04-21 04:25:22 +08:00
virtual Reference < IPeekCursor > peek ( Version begin , std : : vector < Tag > tags , bool parallelGetMore ) {
2018-03-30 06:12:38 +08:00
if ( tags . empty ( ) ) {
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( ) , invalidTag , begin , getPeekEnd ( ) , false , false ) ) ;
}
if ( tags . size ( ) = = 1 ) {
2018-04-21 04:25:22 +08:00
return peek ( begin , tags [ 0 ] , parallelGetMore ) ;
2018-03-30 06:12:38 +08:00
}
std : : vector < Reference < ILogSystem : : IPeekCursor > > cursors ;
for ( auto tag : tags ) {
2018-04-21 04:25:22 +08:00
cursors . push_back ( peek ( begin , tag , parallelGetMore ) ) ;
2018-03-30 06:12:38 +08:00
}
2018-03-30 09:19:29 +08:00
return Reference < ILogSystem : : MergedPeekCursor > ( new ILogSystem : : MergedPeekCursor ( cursors , begin , tLogs . size ( ) & & tLogs [ 0 ] - > locality = = tagLocalityUpgraded ) ) ;
2018-03-30 06:12:38 +08:00
}
Reference < IPeekCursor > peekLocal ( Tag tag , Version begin , Version end ) {
2018-02-24 04:26:19 +08:00
int bestSet = - 1 ;
for ( int t = 0 ; t < tLogs . size ( ) ; t + + ) {
2018-04-09 12:24:05 +08:00
if ( tLogs [ t ] - > logServers . size ( ) & & tLogs [ t ] - > hasBestPolicy & & ( tLogs [ t ] - > locality = = tag . locality | | tag . locality = = tagLocalitySpecial | | tLogs [ t ] - > locality = = tagLocalitySpecial | | tLogs [ t ] - > locality = = tagLocalityUpgraded | | ( tLogs [ t ] - > isLocal & & tag . locality = = tagLocalityLogRouter ) ) ) {
2018-02-24 04:26:19 +08:00
bestSet = t ;
break ;
2017-07-14 03:29:21 +08:00
}
2018-02-24 04:26:19 +08:00
}
2018-04-09 12:24:05 +08:00
if ( bestSet = = - 1 ) {
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( ) , tag , begin , getPeekEnd ( ) , false , false ) ) ;
}
if ( oldLogData . size ( ) = = 0 | | begin > = tLogs [ bestSet ] - > startVersion ) {
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( tLogs [ bestSet ] - > logServers [ tLogs [ bestSet ] - > bestLocationFor ( tag ) ] , tag , begin , end , false , false ) ) ;
2018-02-24 04:26:19 +08:00
} else {
std : : vector < Reference < ILogSystem : : IPeekCursor > > cursors ;
std : : vector < LogMessageVersion > epochEnds ;
2018-03-30 06:12:38 +08:00
2018-04-09 12:24:05 +08:00
if ( tLogs [ bestSet ] - > startVersion < end ) {
cursors . push_back ( Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( tLogs [ bestSet ] - > logServers [ tLogs [ bestSet ] - > bestLocationFor ( tag ) ] , tag , tLogs [ bestSet ] - > startVersion , end , false , false ) ) ) ;
2018-03-31 08:39:45 +08:00
}
2018-04-09 12:24:05 +08:00
Version lastBegin = tLogs [ bestSet ] - > startVersion ;
for ( int i = 0 ; i < oldLogData . size ( ) & & begin < lastBegin ; i + + ) {
2018-03-30 06:12:38 +08:00
int bestOldSet = - 1 ;
for ( int t = 0 ; t < oldLogData [ i ] . tLogs . size ( ) ; t + + ) {
2018-04-09 12:24:05 +08:00
if ( oldLogData [ i ] . tLogs [ t ] - > logServers . size ( ) & & oldLogData [ i ] . tLogs [ t ] - > hasBestPolicy & & ( oldLogData [ i ] . tLogs [ t ] - > locality = = tag . locality | | tag . locality = = tagLocalitySpecial | | oldLogData [ i ] . tLogs [ t ] - > locality = = tagLocalitySpecial | | oldLogData [ i ] . tLogs [ t ] - > locality = = tagLocalityUpgraded | | ( oldLogData [ i ] . tLogs [ t ] - > isLocal & & tag . locality = = tagLocalityLogRouter ) ) ) {
2018-03-30 06:12:38 +08:00
bestOldSet = t ;
2018-02-24 04:26:19 +08:00
break ;
2018-01-29 03:52:54 +08:00
}
}
2018-03-30 06:12:38 +08:00
if ( bestOldSet = = - 1 ) {
2018-04-15 10:06:24 +08:00
continue ;
2018-03-30 06:12:38 +08:00
}
2018-03-31 08:39:45 +08:00
2018-04-09 12:24:05 +08:00
Version thisBegin = std : : max ( oldLogData [ i ] . tLogs [ bestOldSet ] - > startVersion , begin ) ;
2018-03-31 08:39:45 +08:00
if ( thisBegin < end ) {
cursors . push_back ( Reference < ILogSystem : : MergedPeekCursor > ( new ILogSystem : : MergedPeekCursor ( oldLogData [ i ] . tLogs [ bestOldSet ] - > logServers , oldLogData [ i ] . tLogs [ bestOldSet ] - > bestLocationFor ( tag ) , oldLogData [ i ] . tLogs [ bestOldSet ] - > logServers . size ( ) + 1 - oldLogData [ i ] . tLogs [ bestOldSet ] - > tLogReplicationFactor , tag ,
2018-04-09 12:24:05 +08:00
thisBegin , std : : min ( lastBegin , end ) , false , oldLogData [ i ] . tLogs [ bestOldSet ] - > tLogLocalities , oldLogData [ i ] . tLogs [ bestOldSet ] - > tLogPolicy , oldLogData [ i ] . tLogs [ bestOldSet ] - > tLogReplicationFactor ) ) ) ;
epochEnds . push_back ( LogMessageVersion ( std : : min ( lastBegin , end ) ) ) ;
2018-03-31 08:39:45 +08:00
}
2018-04-09 12:24:05 +08:00
lastBegin = thisBegin ;
2018-03-30 06:12:38 +08:00
}
return Reference < ILogSystem : : MultiCursor > ( new ILogSystem : : MultiCursor ( cursors , epochEnds ) ) ;
}
}
virtual Reference < IPeekCursor > peekSingle ( Version begin , Tag tag , vector < pair < Version , Tag > > history ) {
while ( history . size ( ) & & begin > = history . back ( ) . first ) {
history . pop_back ( ) ;
}
if ( history . size ( ) = = 0 ) {
return peekLocal ( tag , begin , getPeekEnd ( ) ) ;
} else {
std : : vector < Reference < ILogSystem : : IPeekCursor > > cursors ;
std : : vector < LogMessageVersion > epochEnds ;
cursors . push_back ( peekLocal ( tag , history [ 0 ] . first , getPeekEnd ( ) ) ) ;
for ( int i = 0 ; i < history . size ( ) ; i + + ) {
cursors . push_back ( peekLocal ( history [ i ] . second , i + 1 = = history . size ( ) ? begin : std : : max ( history [ i + 1 ] . first , begin ) , history [ i ] . first ) ) ;
2018-02-24 04:26:19 +08:00
epochEnds . push_back ( LogMessageVersion ( history [ i ] . first ) ) ;
2017-09-08 06:32:08 +08:00
}
2018-02-24 04:26:19 +08:00
return Reference < ILogSystem : : MultiCursor > ( new ILogSystem : : MultiCursor ( cursors , epochEnds ) ) ;
2017-05-26 04:48:44 +08:00
}
}
2018-03-30 06:12:38 +08:00
virtual Reference < IPeekCursor > peekLogRouter ( Version begin , Tag tag , UID logRouterID ) {
bool found = false ;
for ( auto & log : tLogs ) {
for ( auto & router : log - > logRouters ) {
if ( router - > get ( ) . id ( ) = = logRouterID ) {
found = true ;
break ;
}
}
if ( found ) {
break ;
}
}
if ( found ) {
for ( auto & log : tLogs ) {
2018-04-09 12:24:05 +08:00
if ( log - > logServers . size ( ) & & log - > isLocal & & log - > hasBestPolicy ) {
2018-03-30 06:12:38 +08:00
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( log - > logServers [ log - > bestLocationFor ( tag ) ] , tag , begin , getPeekEnd ( ) , false , false ) ) ;
}
}
}
for ( auto & old : oldLogData ) {
found = false ;
for ( auto & log : old . tLogs ) {
for ( auto & router : log - > logRouters ) {
if ( router - > get ( ) . id ( ) = = logRouterID ) {
found = true ;
break ;
}
}
if ( found ) {
break ;
}
}
if ( found ) {
2018-04-09 12:24:05 +08:00
int bestSet = - 1 ;
int nextBestSet = - 1 ;
std : : vector < Reference < LogSet > > localSets ;
for ( auto & log : old . tLogs ) {
if ( log - > isLocal & & log - > logServers . size ( ) ) {
localSets . push_back ( log ) ;
if ( log - > hasBestPolicy & & ( log - > locality = = tag . locality | | tag . locality = = tagLocalitySpecial | | log - > locality = = tagLocalitySpecial | | log - > locality = = tagLocalityUpgraded ) ) {
bestSet = localSets . size ( ) - 1 ;
nextBestSet = bestSet ;
}
if ( log - > hasBestPolicy & & bestSet = = - 1 ) {
nextBestSet = localSets . size ( ) - 1 ;
}
2018-03-30 06:12:38 +08:00
}
}
2018-04-09 12:24:05 +08:00
//FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN
return Reference < ILogSystem : : SetPeekCursor > ( new ILogSystem : : SetPeekCursor ( localSets , bestSet = = - 1 ? nextBestSet : bestSet ,
bestSet > = 0 ? localSets [ bestSet ] - > bestLocationFor ( tag ) : - 1 , tag , begin , old . epochEnd , false ) ) ;
2018-03-30 06:12:38 +08:00
}
}
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( ) , tag , begin , getPeekEnd ( ) , false , false ) ) ;
}
2018-04-19 03:07:29 +08:00
void popLogRouter ( Version upTo , Tag tag , Version knownCommittedVersion , int8_t popLocality ) { //FIXME: do not need to pop all generations of old logs
2017-07-10 05:46:16 +08:00
if ( ! upTo ) return ;
for ( auto & t : tLogs ) {
2018-04-09 12:24:05 +08:00
if ( t - > locality = = popLocality ) {
for ( auto & log : t - > logRouters ) {
2018-04-19 03:07:29 +08:00
Version prev = outstandingPops [ std : : make_pair ( log - > get ( ) . id ( ) , tag ) ] . first ;
2018-04-09 12:24:05 +08:00
if ( prev < upTo )
2018-04-19 03:07:29 +08:00
outstandingPops [ std : : make_pair ( log - > get ( ) . id ( ) , tag ) ] = std : : make_pair ( upTo , knownCommittedVersion ) ;
2018-04-09 12:24:05 +08:00
if ( prev = = 0 )
actors . add ( popFromLog ( this , log , tag , 0.0 ) ) ; //Fast pop time because log routers can only hold 5 seconds of data.
}
}
}
for ( auto & old : oldLogData ) {
for ( auto & t : old . tLogs ) {
if ( t - > locality = = popLocality ) {
for ( auto & log : t - > logRouters ) {
2018-04-19 03:07:29 +08:00
Version prev = outstandingPops [ std : : make_pair ( log - > get ( ) . id ( ) , tag ) ] . first ;
2018-04-09 12:24:05 +08:00
if ( prev < upTo )
2018-04-19 03:07:29 +08:00
outstandingPops [ std : : make_pair ( log - > get ( ) . id ( ) , tag ) ] = std : : make_pair ( upTo , knownCommittedVersion ) ;
2018-04-09 12:24:05 +08:00
if ( prev = = 0 )
actors . add ( popFromLog ( this , log , tag , 0.0 ) ) ;
}
}
}
}
}
2018-04-19 03:07:29 +08:00
virtual void pop ( Version upTo , Tag tag , Version knownCommittedVersion , int8_t popLocality ) {
2018-04-09 12:24:05 +08:00
if ( upTo < = 0 ) return ;
if ( tag . locality = = tagLocalityRemoteLog ) {
2018-04-19 03:07:29 +08:00
popLogRouter ( upTo , tag , knownCommittedVersion , popLocality ) ;
2018-04-09 12:24:05 +08:00
return ;
}
ASSERT ( popLocality = = tagLocalityInvalid ) ;
for ( auto & t : tLogs ) {
for ( auto & log : t - > logServers ) {
2018-04-19 03:07:29 +08:00
Version prev = outstandingPops [ std : : make_pair ( log - > get ( ) . id ( ) , tag ) ] . first ;
2017-07-10 05:46:16 +08:00
if ( prev < upTo )
2018-04-19 03:07:29 +08:00
outstandingPops [ std : : make_pair ( log - > get ( ) . id ( ) , tag ) ] = std : : make_pair ( upTo , knownCommittedVersion ) ;
2017-07-10 05:46:16 +08:00
if ( prev = = 0 )
2018-04-09 12:24:05 +08:00
actors . add ( popFromLog ( this , log , tag , 1.0 ) ) ; //< FIXME: knob
2017-07-10 05:46:16 +08:00
}
2017-05-26 04:48:44 +08:00
}
}
2018-04-09 12:24:05 +08:00
ACTOR static Future < Void > popFromLog ( TagPartitionedLogSystem * self , Reference < AsyncVar < OptionalInterface < TLogInterface > > > log , Tag tag , double time ) {
2017-05-26 04:48:44 +08:00
state Version last = 0 ;
loop {
2018-04-09 12:24:05 +08:00
Void _ = wait ( delay ( time ) ) ;
2017-05-26 04:48:44 +08:00
2018-04-19 03:07:29 +08:00
state std : : pair < Version , Version > to = self - > outstandingPops [ std : : make_pair ( log - > get ( ) . id ( ) , tag ) ] ;
2017-05-26 04:48:44 +08:00
2018-04-19 03:07:29 +08:00
if ( to . first < = last ) {
2017-07-10 05:46:16 +08:00
self - > outstandingPops . erase ( std : : make_pair ( log - > get ( ) . id ( ) , tag ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
try {
2017-07-10 05:46:16 +08:00
if ( ! log - > get ( ) . present ( ) )
2017-05-26 04:48:44 +08:00
return Void ( ) ;
2018-04-19 03:07:29 +08:00
Void _ = wait ( log - > get ( ) . interf ( ) . popMessages . getReply ( TLogPopRequest ( to . first , to . second , tag ) ) ) ;
2017-05-26 04:48:44 +08:00
2018-04-19 03:07:29 +08:00
last = to . first ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_actor_cancelled ) throw ;
2017-07-10 05:46:16 +08:00
TraceEvent ( ( e . code ( ) = = error_code_broken_promise ) ? SevInfo : SevError , " LogPopError " , self - > dbgid ) . detail ( " Log " , log - > get ( ) . id ( ) ) . error ( e ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ; // Leaving outstandingPops filled in means no further pop requests to this tlog from this logSystem
}
}
}
2017-10-06 08:09:44 +08:00
ACTOR static Future < Void > confirmEpochLive_internal ( Reference < LogSet > logSet , Optional < UID > debugID ) {
2017-08-29 04:46:14 +08:00
state vector < Future < Void > > alive ;
2017-09-23 07:19:16 +08:00
int numPresent = 0 ;
2017-10-06 08:09:44 +08:00
for ( auto & t : logSet - > logServers ) {
2017-08-29 04:46:14 +08:00
if ( t - > get ( ) . present ( ) ) {
alive . push_back ( brokenPromiseToNever (
t - > get ( ) . interf ( ) . confirmRunning . getReply ( TLogConfirmRunningRequest ( debugID ) ,
TaskTLogConfirmRunningReply ) ) ) ;
2017-09-23 07:19:16 +08:00
numPresent + + ;
2017-08-29 04:46:14 +08:00
} else {
alive . push_back ( Never ( ) ) ;
}
2017-05-26 04:48:44 +08:00
}
2017-08-29 04:46:14 +08:00
2017-10-06 08:09:44 +08:00
Void _ = wait ( quorum ( alive , std : : min ( logSet - > tLogReplicationFactor , numPresent - logSet - > tLogWriteAntiQuorum ) ) ) ;
2017-09-22 08:59:30 +08:00
2017-11-16 13:05:10 +08:00
state Reference < LocalityGroup > locked ( new LocalityGroup ( ) ) ;
state std : : vector < bool > responded ( alive . size ( ) ) ;
for ( int i = 0 ; i < alive . size ( ) ; i + + ) {
responded [ i ] = false ;
}
2017-08-29 04:46:14 +08:00
loop {
for ( int i = 0 ; i < alive . size ( ) ; i + + ) {
2017-11-16 13:05:10 +08:00
if ( ! responded [ i ] & & alive [ i ] . isReady ( ) & & ! alive [ i ] . isError ( ) ) {
2018-01-06 03:33:42 +08:00
locked - > add ( logSet - > tLogLocalities [ i ] ) ;
2017-11-16 13:05:10 +08:00
responded [ i ] = true ;
2017-08-29 04:46:14 +08:00
}
}
2018-01-06 03:33:42 +08:00
bool quorum_obtained = locked - > validate ( logSet - > tLogPolicy ) ;
2017-11-16 13:05:10 +08:00
// We intentionally skip considering antiquorums, as the CPU cost of doing so is prohibitive.
2018-01-06 03:33:42 +08:00
if ( logSet - > tLogReplicationFactor = = 1 & & locked - > size ( ) > 0 ) {
2017-08-29 04:46:14 +08:00
ASSERT ( quorum_obtained ) ;
}
if ( quorum_obtained ) {
return Void ( ) ;
}
// The current set of responders that we have weren't enough to form a quorum, so we must
// wait for more responses and try again.
std : : vector < Future < Void > > changes ;
for ( int i = 0 ; i < alive . size ( ) ; i + + ) {
if ( ! alive [ i ] . isReady ( ) ) {
changes . push_back ( ready ( alive [ i ] ) ) ;
2017-09-14 06:45:09 +08:00
} else if ( alive [ i ] . isReady ( ) & & alive [ i ] . isError ( ) & &
2017-09-14 06:49:39 +08:00
alive [ i ] . getError ( ) . code ( ) = = error_code_tlog_stopped ) {
2017-09-14 06:45:09 +08:00
// All commits must go to all TLogs. If any TLog is stopped, then our epoch has ended.
return Never ( ) ;
2017-08-29 04:46:14 +08:00
}
}
ASSERT ( changes . size ( ) ! = 0 ) ;
Void _ = wait ( waitForAny ( changes ) ) ;
}
}
2018-01-08 07:32:43 +08:00
// Returns success after confirming that pushes in the current epoch are still possible
2017-05-26 04:48:44 +08:00
virtual Future < Void > confirmEpochLive ( Optional < UID > debugID ) {
2017-07-10 05:46:16 +08:00
vector < Future < Void > > quorumResults ;
for ( auto & it : tLogs ) {
2018-04-09 12:24:05 +08:00
if ( it - > isLocal & & it - > logServers . size ( ) ) {
2017-10-06 08:09:44 +08:00
quorumResults . push_back ( confirmEpochLive_internal ( it , debugID ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-05-26 04:48:44 +08:00
}
2017-07-10 05:46:16 +08:00
return waitForAll ( quorumResults ) ;
2017-05-26 04:48:44 +08:00
}
2017-08-04 07:16:36 +08:00
virtual Future < Void > endEpoch ( ) {
std : : vector < Future < Void > > lockResults ;
for ( auto & logSet : tLogs ) {
for ( auto & log : logSet - > logServers ) {
lockResults . push_back ( success ( lockTLog ( dbgid , log ) ) ) ;
}
}
return waitForAll ( lockResults ) ;
}
2017-09-08 06:32:08 +08:00
virtual Future < Reference < ILogSystem > > newEpoch ( RecruitFromConfigurationReply const & recr , Future < RecruitRemoteFromConfigurationReply > const & fRemoteWorkers , DatabaseConfiguration const & config , LogEpoch recoveryCount , int8_t primaryLocality , int8_t remoteLocality ) {
2017-05-26 04:48:44 +08:00
// Call only after end_epoch() has successfully completed. Returns a new epoch immediately following this one. The new epoch
// is only provisional until the caller updates the coordinated DBCoreState
2017-09-08 06:32:08 +08:00
return newEpoch ( Reference < TagPartitionedLogSystem > : : addRef ( this ) , recr , fRemoteWorkers , config , recoveryCount , primaryLocality , remoteLocality ) ;
2017-05-26 04:48:44 +08:00
}
virtual LogSystemConfig getLogSystemConfig ( ) {
LogSystemConfig logSystemConfig ;
logSystemConfig . logSystemType = logSystemType ;
2017-09-08 06:32:08 +08:00
logSystemConfig . expectedLogSets = expectedLogSets ;
2018-04-09 12:24:05 +08:00
logSystemConfig . logRouterTags = logRouterTags ;
2018-04-21 04:25:22 +08:00
logSystemConfig . recruitmentID = recruitmentID ;
2017-07-12 06:48:10 +08:00
for ( int i = 0 ; i < tLogs . size ( ) ; i + + ) {
Reference < LogSet > logSet = tLogs [ i ] ;
2017-09-08 06:32:08 +08:00
if ( logSet - > isLocal | | remoteLogsWrittenToCoreState ) {
logSystemConfig . tLogs . push_back ( TLogSet ( ) ) ;
TLogSet & log = logSystemConfig . tLogs . back ( ) ;
log . tLogWriteAntiQuorum = logSet - > tLogWriteAntiQuorum ;
log . tLogReplicationFactor = logSet - > tLogReplicationFactor ;
log . tLogPolicy = logSet - > tLogPolicy ;
log . tLogLocalities = logSet - > tLogLocalities ;
log . isLocal = logSet - > isLocal ;
2018-01-30 09:48:18 +08:00
log . hasBestPolicy = logSet - > hasBestPolicy ;
2017-09-08 06:32:08 +08:00
log . locality = logSet - > locality ;
2018-03-30 06:12:38 +08:00
log . startVersion = logSet - > startVersion ;
2017-09-08 06:32:08 +08:00
for ( int i = 0 ; i < logSet - > logServers . size ( ) ; i + + ) {
log . tLogs . push_back ( logSet - > logServers [ i ] - > get ( ) ) ;
}
2017-07-10 05:46:16 +08:00
2017-09-08 06:32:08 +08:00
for ( int i = 0 ; i < logSet - > logRouters . size ( ) ; i + + ) {
log . logRouters . push_back ( logSet - > logRouters [ i ] - > get ( ) ) ;
}
2017-07-10 05:46:16 +08:00
}
}
2017-05-26 04:48:44 +08:00
if ( ! recoveryCompleteWrittenToCoreState ) {
for ( int i = 0 ; i < oldLogData . size ( ) ; i + + ) {
logSystemConfig . oldTLogs . push_back ( OldTLogConf ( ) ) ;
2017-07-10 05:46:16 +08:00
2017-07-12 06:48:10 +08:00
logSystemConfig . oldTLogs [ i ] . tLogs . resize ( oldLogData [ i ] . tLogs . size ( ) ) ;
for ( int j = 0 ; j < oldLogData [ i ] . tLogs . size ( ) ; j + + ) {
TLogSet & log = logSystemConfig . oldTLogs [ i ] . tLogs [ j ] ;
Reference < LogSet > logSet = oldLogData [ i ] . tLogs [ j ] ;
log . tLogWriteAntiQuorum = logSet - > tLogWriteAntiQuorum ;
log . tLogReplicationFactor = logSet - > tLogReplicationFactor ;
log . tLogPolicy = logSet - > tLogPolicy ;
log . tLogLocalities = logSet - > tLogLocalities ;
log . isLocal = logSet - > isLocal ;
2018-01-30 09:48:18 +08:00
log . hasBestPolicy = logSet - > hasBestPolicy ;
2017-09-08 06:32:08 +08:00
log . locality = logSet - > locality ;
2018-03-30 06:12:38 +08:00
log . startVersion = logSet - > startVersion ;
2017-07-12 06:48:10 +08:00
for ( int i = 0 ; i < logSet - > logServers . size ( ) ; i + + ) {
log . tLogs . push_back ( logSet - > logServers [ i ] - > get ( ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-07-12 06:48:10 +08:00
for ( int i = 0 ; i < logSet - > logRouters . size ( ) ; i + + ) {
log . logRouters . push_back ( logSet - > logRouters [ i ] - > get ( ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-05-26 04:48:44 +08:00
}
2018-04-09 12:24:05 +08:00
logSystemConfig . oldTLogs [ i ] . logRouterTags = oldLogData [ i ] . logRouterTags ;
2017-05-26 04:48:44 +08:00
logSystemConfig . oldTLogs [ i ] . epochEnd = oldLogData [ i ] . epochEnd ;
}
}
return logSystemConfig ;
}
virtual Standalone < StringRef > getLogsValue ( ) {
vector < std : : pair < UID , NetworkAddress > > logs ;
vector < std : : pair < UID , NetworkAddress > > oldLogs ;
2017-07-10 05:46:16 +08:00
for ( auto & t : tLogs ) {
2017-09-08 06:32:08 +08:00
if ( t - > isLocal | | remoteLogsWrittenToCoreState ) {
for ( int i = 0 ; i < t - > logServers . size ( ) ; i + + ) {
logs . push_back ( std : : make_pair ( t - > logServers [ i ] - > get ( ) . id ( ) , t - > logServers [ i ] - > get ( ) . present ( ) ? t - > logServers [ i ] - > get ( ) . interf ( ) . address ( ) : NetworkAddress ( ) ) ) ;
}
2017-07-10 05:46:16 +08:00
}
}
2017-05-26 04:48:44 +08:00
if ( ! recoveryCompleteWrittenToCoreState ) {
for ( int i = 0 ; i < oldLogData . size ( ) ; i + + ) {
2017-07-10 05:46:16 +08:00
for ( auto & t : oldLogData [ i ] . tLogs ) {
2017-07-12 06:48:10 +08:00
for ( int j = 0 ; j < t - > logServers . size ( ) ; j + + ) {
oldLogs . push_back ( std : : make_pair ( t - > logServers [ j ] - > get ( ) . id ( ) , t - > logServers [ j ] - > get ( ) . present ( ) ? t - > logServers [ j ] - > get ( ) . interf ( ) . address ( ) : NetworkAddress ( ) ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-05-26 04:48:44 +08:00
}
}
}
return logsValue ( logs , oldLogs ) ;
}
virtual Future < Void > onLogSystemConfigChange ( ) {
std : : vector < Future < Void > > changes ;
2018-04-22 03:57:00 +08:00
changes . push_back ( logSystemConfigChanged . onTrigger ( ) ) ;
2017-07-10 05:46:16 +08:00
for ( auto & t : tLogs ) {
2017-07-12 06:48:10 +08:00
for ( int i = 0 ; i < t - > logServers . size ( ) ; i + + ) {
changes . push_back ( t - > logServers [ i ] - > onChange ( ) ) ;
2017-07-10 05:46:16 +08:00
}
}
2018-04-09 12:24:05 +08:00
for ( int i = 0 ; i < oldLogData . size ( ) ; i + + ) {
2017-07-10 05:46:16 +08:00
for ( auto & t : oldLogData [ i ] . tLogs ) {
2017-07-12 06:48:10 +08:00
for ( int j = 0 ; j < t - > logServers . size ( ) ; j + + ) {
changes . push_back ( t - > logServers [ j ] - > onChange ( ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-05-26 04:48:44 +08:00
}
}
2018-04-09 12:24:05 +08:00
if ( hasRemoteServers & & ! remoteRecovery . isReady ( ) ) {
changes . push_back ( remoteRecovery ) ;
}
2017-05-26 04:48:44 +08:00
return waitForAny ( changes ) ;
}
virtual Version getEnd ( ) {
ASSERT ( epochEndVersion . present ( ) ) ;
return epochEndVersion . get ( ) + 1 ;
}
Version getPeekEnd ( ) {
if ( epochEndVersion . present ( ) )
return getEnd ( ) ;
else
return std : : numeric_limits < Version > : : max ( ) ;
}
virtual void getPushLocations ( std : : vector < Tag > const & tags , std : : vector < int > & locations ) {
2017-07-10 05:46:16 +08:00
int locationOffset = 0 ;
for ( auto & log : tLogs ) {
2018-04-09 12:24:05 +08:00
if ( log - > isLocal & & log - > logServers . size ( ) ) {
2017-07-12 06:48:10 +08:00
log - > getPushLocations ( tags , locations , locationOffset ) ;
locationOffset + = log - > logServers . size ( ) ;
2017-05-26 04:48:44 +08:00
}
}
}
2018-01-09 04:04:19 +08:00
virtual bool hasRemoteLogs ( ) {
2018-04-09 12:24:05 +08:00
return logRouterTags > 0 ;
2018-01-09 04:04:19 +08:00
}
virtual void addRemoteTags ( int logSet , std : : vector < Tag > const & originalTags , std : : vector < int > & tags ) {
tLogs [ logSet ] - > getPushLocations ( originalTags , tags , 0 ) ;
}
2017-07-10 05:46:16 +08:00
virtual Tag getRandomRouterTag ( ) {
2018-04-09 12:24:05 +08:00
return Tag ( tagLocalityLogRouter , g_random - > randomInt ( 0 , logRouterTags ) ) ;
2017-05-26 04:48:44 +08:00
}
2018-03-31 10:08:01 +08:00
virtual const std : : set < Tag > & getEpochEndTags ( ) {
return epochEndTags ;
}
2017-05-26 04:48:44 +08:00
ACTOR static Future < Void > monitorLog ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > logServer , Reference < AsyncVar < bool > > failed ) {
state Future < Void > waitFailure ;
loop {
if ( logServer - > get ( ) . present ( ) )
waitFailure = waitFailureTracker ( logServer - > get ( ) . interf ( ) . waitFailure , failed ) ;
else
failed - > set ( true ) ;
Void _ = wait ( logServer - > onChange ( ) ) ;
}
}
2018-04-09 12:24:05 +08:00
ACTOR static Future < std : : pair < Version , Version > > getDurableVersion ( UID dbgid , LogLockInfo lockInfo , std : : vector < Reference < AsyncVar < bool > > > failed = std : : vector < Reference < AsyncVar < bool > > > ( ) ) {
state Reference < LogSet > logSet = lockInfo . logSet ;
loop {
// To ensure consistent recovery, the number of servers NOT in the write quorum plus the number of servers NOT in the read quorum
// have to be strictly less than the replication factor. Otherwise there could be a replica set consistent entirely of servers that
// are out of date due to not being in the write quorum or unavailable due to not being in the read quorum.
// So with N = # of tlogs, W = antiquorum, R = required count, F = replication factor,
// W + (N - R) < F, and optimally (N-W)+(N-R)=F-1. Thus R=N+1-F+W.
int requiredCount = ( int ) logSet - > logServers . size ( ) + 1 - logSet - > tLogReplicationFactor + logSet - > tLogWriteAntiQuorum ;
ASSERT ( requiredCount > 0 & & requiredCount < = logSet - > logServers . size ( ) ) ;
ASSERT ( logSet - > tLogReplicationFactor > = 1 & & logSet - > tLogReplicationFactor < = logSet - > logServers . size ( ) ) ;
ASSERT ( logSet - > tLogWriteAntiQuorum > = 0 & & logSet - > tLogWriteAntiQuorum < logSet - > logServers . size ( ) ) ;
std : : vector < LocalityData > availableItems , badCombo ;
std : : vector < TLogLockResult > results ;
std : : string sServerState ;
LocalityGroup unResponsiveSet ;
double t = timer ( ) ;
for ( int t = 0 ; t < logSet - > logServers . size ( ) ; t + + ) {
if ( lockInfo . replies [ t ] . isReady ( ) & & ! lockInfo . replies [ t ] . isError ( ) & & ( ! failed . size ( ) | | ! failed [ t ] - > get ( ) ) ) {
results . push_back ( lockInfo . replies [ t ] . get ( ) ) ;
availableItems . push_back ( logSet - > tLogLocalities [ t ] ) ;
sServerState + = ' a ' ;
}
else {
unResponsiveSet . add ( logSet - > tLogLocalities [ t ] ) ;
sServerState + = ' f ' ;
}
}
// Check if the list of results is not larger than the anti quorum
bool bTooManyFailures = ( results . size ( ) < = logSet - > tLogWriteAntiQuorum ) ;
// Check if failed logs complete the policy
bTooManyFailures = bTooManyFailures | | ( ( unResponsiveSet . size ( ) > = logSet - > tLogReplicationFactor ) & & ( unResponsiveSet . validate ( logSet - > tLogPolicy ) ) ) ;
// Check all combinations of the AntiQuorum within the failed
if ( ! bTooManyFailures & & ( logSet - > tLogWriteAntiQuorum ) & & ( ! validateAllCombinations ( badCombo , unResponsiveSet , logSet - > tLogPolicy , availableItems , logSet - > tLogWriteAntiQuorum , false ) ) ) {
TraceEvent ( " EpochEndBadCombo " , dbgid ) . detail ( " Required " , requiredCount ) . detail ( " Present " , results . size ( ) ) . detail ( " ServerState " , sServerState ) ;
bTooManyFailures = true ;
}
ASSERT ( logSet - > logServers . size ( ) = = lockInfo . replies . size ( ) ) ;
if ( ! bTooManyFailures ) {
std : : sort ( results . begin ( ) , results . end ( ) , sort_by_end ( ) ) ;
int absent = logSet - > logServers . size ( ) - results . size ( ) ;
int safe_range_begin = logSet - > tLogWriteAntiQuorum ;
int new_safe_range_begin = std : : min ( logSet - > tLogWriteAntiQuorum , ( int ) ( results . size ( ) - 1 ) ) ;
int safe_range_end = logSet - > tLogReplicationFactor - absent ;
Version knownCommittedVersion = results [ new_safe_range_begin ] . end - ( g_network - > isSimulated ( ) ? 10 * SERVER_KNOBS - > VERSIONS_PER_SECOND : SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) ; //In simulation this must be the maximum MAX_READ_TRANSACTION_LIFE_VERSIONS
for ( int i = 0 ; i < results . size ( ) ; i + + ) {
knownCommittedVersion = std : : max ( knownCommittedVersion , results [ i ] . knownCommittedVersion ) ;
}
2018-04-22 07:03:28 +08:00
TraceEvent ( " GetDurableResult " , dbgid ) . detail ( " Required " , requiredCount ) . detail ( " Present " , results . size ( ) ) . detail ( " ServerState " , sServerState )
2018-04-09 12:24:05 +08:00
. detail ( " RecoveryVersion " , ( ( safe_range_end > 0 ) & & ( safe_range_end - 1 < results . size ( ) ) ) ? results [ safe_range_end - 1 ] . end : - 1 )
. detail ( " EndVersion " , results [ new_safe_range_begin ] . end ) . detail ( " SafeBegin " , safe_range_begin ) . detail ( " SafeEnd " , safe_range_end )
2018-04-22 07:03:28 +08:00
. detail ( " NewSafeBegin " , new_safe_range_begin ) . detail ( " knownCommittedVersion " , knownCommittedVersion ) . detail ( " epochEnd " , lockInfo . epochEnd ) ;
2018-04-09 12:24:05 +08:00
2018-04-22 07:03:28 +08:00
return std : : make_pair ( std : : min ( knownCommittedVersion + 1 , lockInfo . epochEnd ) , results [ new_safe_range_begin ] . end ) ;
2018-04-09 12:24:05 +08:00
}
TraceEvent ( " LogSystemWaitingForRecovery " , dbgid ) . detail ( " Required " , requiredCount ) . detail ( " Present " , results . size ( ) ) . detail ( " ServerState " , sServerState ) ;
// Wait for anything relevant to change
std : : vector < Future < Void > > changes ;
for ( int j = 0 ; j < logSet - > logServers . size ( ) ; j + + ) {
if ( ! lockInfo . replies [ j ] . isReady ( ) )
changes . push_back ( ready ( lockInfo . replies [ j ] ) ) ;
else {
changes . push_back ( logSet - > logServers [ j ] - > onChange ( ) ) ;
if ( failed . size ( ) ) {
changes . push_back ( failed [ j ] - > onChange ( ) ) ;
}
}
}
ASSERT ( changes . size ( ) ) ;
Void _ = wait ( waitForAny ( changes ) ) ;
}
}
2017-05-26 04:48:44 +08:00
ACTOR static Future < Void > epochEnd ( Reference < AsyncVar < Reference < ILogSystem > > > outLogSystem , UID dbgid , DBCoreState prevState , FutureStream < TLogRejoinRequest > rejoinRequests , LocalityData locality ) {
// Stops a co-quorum of tlogs so that no further versions can be committed until the DBCoreState coordination state is changed
// Creates a new logSystem representing the (now frozen) epoch
// No other important side effects.
// The writeQuorum in the master info is from the previous configuration
if ( ! prevState . tLogs . size ( ) ) {
// This is a brand new database
Reference < TagPartitionedLogSystem > logSystem ( new TagPartitionedLogSystem ( dbgid , locality ) ) ;
logSystem - > logSystemType = prevState . logSystemType ;
logSystem - > epochEndVersion = 0 ;
logSystem - > knownCommittedVersion = 0 ;
outLogSystem - > set ( logSystem ) ;
Void _ = wait ( Future < Void > ( Never ( ) ) ) ;
throw internal_error ( ) ;
}
TEST ( true ) ; // Master recovery from pre-existing database
// trackRejoins listens for rejoin requests from the tLogs that we are recovering from, to learn their TLogInterfaces
2018-03-30 06:12:38 +08:00
state std : : vector < LogLockInfo > lockResults ;
2017-05-26 04:48:44 +08:00
state std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > allLogServers ;
2017-07-12 06:48:10 +08:00
state std : : vector < Reference < LogSet > > logServers ;
2017-05-26 04:48:44 +08:00
state std : : vector < OldLogData > oldLogData ;
2017-07-10 05:46:16 +08:00
state std : : vector < std : : vector < Reference < AsyncVar < bool > > > > logFailed ;
2017-05-26 04:48:44 +08:00
state std : : vector < Future < Void > > failureTrackers ;
2017-07-12 06:48:10 +08:00
logServers . resize ( prevState . tLogs . size ( ) ) ;
for ( int i = 0 ; i < prevState . tLogs . size ( ) ; i + + ) {
Reference < LogSet > logSet = Reference < LogSet > ( new LogSet ( ) ) ;
logServers [ i ] = logSet ;
CoreTLogSet const & coreSet = prevState . tLogs [ i ] ;
2017-07-10 05:46:16 +08:00
std : : vector < Reference < AsyncVar < bool > > > failed ;
2017-07-12 06:48:10 +08:00
for ( int j = 0 ; j < coreSet . tLogs . size ( ) ; j + + ) {
Reference < AsyncVar < OptionalInterface < TLogInterface > > > logVar = Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( OptionalInterface < TLogInterface > ( coreSet . tLogs [ j ] ) ) ) ;
logSet - > logServers . push_back ( logVar ) ;
2017-05-26 04:48:44 +08:00
allLogServers . push_back ( logVar ) ;
2017-07-10 05:46:16 +08:00
failed . push_back ( Reference < AsyncVar < bool > > ( new AsyncVar < bool > ( ) ) ) ;
2017-07-12 06:48:10 +08:00
failureTrackers . push_back ( monitorLog ( logVar , failed [ j ] ) ) ;
2017-07-10 05:46:16 +08:00
}
2017-07-12 06:48:10 +08:00
logSet - > tLogReplicationFactor = coreSet . tLogReplicationFactor ;
logSet - > tLogWriteAntiQuorum = coreSet . tLogWriteAntiQuorum ;
logSet - > tLogPolicy = coreSet . tLogPolicy ;
logSet - > tLogLocalities = coreSet . tLogLocalities ;
logSet - > isLocal = coreSet . isLocal ;
2018-01-30 09:48:18 +08:00
logSet - > hasBestPolicy = coreSet . hasBestPolicy ;
2017-09-08 06:32:08 +08:00
logSet - > locality = coreSet . locality ;
2018-03-30 06:12:38 +08:00
logSet - > startVersion = coreSet . startVersion ;
2017-07-10 05:46:16 +08:00
logFailed . push_back ( failed ) ;
}
2017-07-12 06:48:10 +08:00
oldLogData . resize ( prevState . oldTLogData . size ( ) ) ;
for ( int i = 0 ; i < prevState . oldTLogData . size ( ) ; i + + ) {
OldLogData & oldData = oldLogData [ i ] ;
OldTLogCoreData const & old = prevState . oldTLogData [ i ] ;
oldData . tLogs . resize ( old . tLogs . size ( ) ) ;
for ( int j = 0 ; j < old . tLogs . size ( ) ; j + + ) {
Reference < LogSet > logSet = Reference < LogSet > ( new LogSet ( ) ) ;
oldData . tLogs [ j ] = logSet ;
CoreTLogSet const & log = old . tLogs [ j ] ;
for ( int k = 0 ; k < log . tLogs . size ( ) ; k + + ) {
Reference < AsyncVar < OptionalInterface < TLogInterface > > > logVar = Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( OptionalInterface < TLogInterface > ( log . tLogs [ k ] ) ) ) ;
logSet - > logServers . push_back ( logVar ) ;
2017-07-10 05:46:16 +08:00
allLogServers . push_back ( logVar ) ;
}
2017-07-12 06:48:10 +08:00
logSet - > tLogReplicationFactor = log . tLogReplicationFactor ;
logSet - > tLogWriteAntiQuorum = log . tLogWriteAntiQuorum ;
logSet - > tLogPolicy = log . tLogPolicy ;
logSet - > tLogLocalities = log . tLogLocalities ;
logSet - > isLocal = log . isLocal ;
2018-01-30 09:48:18 +08:00
logSet - > hasBestPolicy = log . hasBestPolicy ;
2017-09-08 06:32:08 +08:00
logSet - > locality = log . locality ;
2018-03-30 06:12:38 +08:00
logSet - > startVersion = log . startVersion ;
2017-05-26 04:48:44 +08:00
}
2017-07-10 05:46:16 +08:00
oldData . epochEnd = old . epochEnd ;
2018-04-09 12:24:05 +08:00
oldData . logRouterTags = old . logRouterTags ;
2017-05-26 04:48:44 +08:00
}
state Future < Void > rejoins = trackRejoins ( dbgid , allLogServers , rejoinRequests ) ;
2018-03-30 06:12:38 +08:00
lockResults . resize ( logServers . size ( ) ) ;
2018-04-09 12:24:05 +08:00
std : : set < int8_t > lockedLocalities ;
bool foundSpecial = false ;
2017-07-10 05:46:16 +08:00
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
2018-04-15 10:06:24 +08:00
if ( logServers [ i ] - > locality = = tagLocalitySpecial | | logServers [ i ] - > locality = = tagLocalityUpgraded ) {
2018-04-09 12:24:05 +08:00
foundSpecial = true ;
}
lockedLocalities . insert ( logServers [ i ] - > locality ) ;
2018-04-23 02:14:13 +08:00
lockResults [ i ] . isCurrent = true ;
2018-04-09 12:24:05 +08:00
lockResults [ i ] . logSet = logServers [ i ] ;
2017-07-12 06:48:10 +08:00
for ( int t = 0 ; t < logServers [ i ] - > logServers . size ( ) ; t + + ) {
2018-03-30 06:12:38 +08:00
lockResults [ i ] . replies . push_back ( lockTLog ( dbgid , logServers [ i ] - > logServers [ t ] ) ) ;
2017-07-10 05:46:16 +08:00
}
}
2017-05-26 04:48:44 +08:00
2018-04-09 12:24:05 +08:00
for ( auto & old : oldLogData ) {
if ( foundSpecial ) {
break ;
}
for ( auto & log : old . tLogs ) {
2018-04-15 10:06:24 +08:00
if ( log - > locality = = tagLocalitySpecial | | log - > locality = = tagLocalityUpgraded ) {
2018-04-09 12:24:05 +08:00
foundSpecial = true ;
break ;
}
if ( ! lockedLocalities . count ( log - > locality ) ) {
lockedLocalities . insert ( log - > locality ) ;
LogLockInfo lockResult ;
2018-04-22 07:03:28 +08:00
lockResult . epochEnd = old . epochEnd ;
2018-04-09 12:24:05 +08:00
lockResult . logSet = log ;
for ( int t = 0 ; t < log - > logServers . size ( ) ; t + + ) {
lockResult . replies . push_back ( lockTLog ( dbgid , log - > logServers [ t ] ) ) ;
}
lockResults . push_back ( lockResult ) ;
}
}
}
2017-05-26 04:48:44 +08:00
state Optional < Version > last_end ;
2017-07-10 05:46:16 +08:00
state int cycles = 0 ;
2017-05-26 04:48:44 +08:00
2018-04-23 02:14:13 +08:00
state Version knownCommittedVersion = 0 ;
2017-05-26 04:48:44 +08:00
loop {
2017-07-10 05:46:16 +08:00
Optional < Version > end ;
for ( int log = 0 ; log < logServers . size ( ) ; log + + ) {
2017-07-12 06:48:10 +08:00
if ( ! logServers [ log ] - > isLocal ) {
2017-07-10 05:46:16 +08:00
continue ;
2017-05-26 04:48:44 +08:00
}
2017-07-10 05:46:16 +08:00
// To ensure consistent recovery, the number of servers NOT in the write quorum plus the number of servers NOT in the read quorum
// have to be strictly less than the replication factor. Otherwise there could be a replica set consistent entirely of servers that
// are out of date due to not being in the write quorum or unavailable due to not being in the read quorum.
2017-10-06 08:09:44 +08:00
// So with N = # of tlogs, W = antiquorum, R = required count, F = replication factor,
// W + (N - R) < F, and optimally (N-W)+(N-R)=F-1. Thus R=N+1-F+W.
2017-07-12 06:48:10 +08:00
state int requiredCount = ( int ) logServers [ log ] - > logServers . size ( ) + 1 - logServers [ log ] - > tLogReplicationFactor + logServers [ log ] - > tLogWriteAntiQuorum ;
ASSERT ( requiredCount > 0 & & requiredCount < = logServers [ log ] - > logServers . size ( ) ) ;
ASSERT ( logServers [ log ] - > tLogReplicationFactor > = 1 & & logServers [ log ] - > tLogReplicationFactor < = logServers [ log ] - > logServers . size ( ) ) ;
ASSERT ( logServers [ log ] - > tLogWriteAntiQuorum > = 0 & & logServers [ log ] - > tLogWriteAntiQuorum < logServers [ log ] - > logServers . size ( ) ) ;
2017-07-10 05:46:16 +08:00
std : : vector < LocalityData > availableItems , badCombo ;
std : : vector < TLogLockResult > results ;
std : : string sServerState ;
LocalityGroup unResponsiveSet ;
double t = timer ( ) ;
cycles + + ;
2017-07-12 06:48:10 +08:00
for ( int t = 0 ; t < logServers [ log ] - > logServers . size ( ) ; t + + ) {
2018-03-30 06:12:38 +08:00
if ( lockResults [ log ] . replies [ t ] . isReady ( ) & & ! lockResults [ log ] . replies [ t ] . isError ( ) & & ! logFailed [ log ] [ t ] - > get ( ) ) {
results . push_back ( lockResults [ log ] . replies [ t ] . get ( ) ) ;
2017-07-12 06:48:10 +08:00
availableItems . push_back ( logServers [ log ] - > tLogLocalities [ t ] ) ;
2017-07-10 05:46:16 +08:00
sServerState + = ' a ' ;
}
else {
2017-07-12 06:48:10 +08:00
unResponsiveSet . add ( logServers [ log ] - > tLogLocalities [ t ] ) ;
2017-07-10 05:46:16 +08:00
sServerState + = ' f ' ;
}
2017-05-26 04:48:44 +08:00
}
2017-07-10 05:46:16 +08:00
// Check if the list of results is not larger than the anti quorum
2017-07-12 06:48:10 +08:00
bool bTooManyFailures = ( results . size ( ) < = logServers [ log ] - > tLogWriteAntiQuorum ) ;
2017-05-26 04:48:44 +08:00
2017-07-10 05:46:16 +08:00
// Check if failed logs complete the policy
2017-07-12 06:48:10 +08:00
bTooManyFailures = bTooManyFailures | | ( ( unResponsiveSet . size ( ) > = logServers [ log ] - > tLogReplicationFactor ) & & ( unResponsiveSet . validate ( logServers [ log ] - > tLogPolicy ) ) ) ;
2017-05-26 04:48:44 +08:00
2017-07-10 05:46:16 +08:00
// Check all combinations of the AntiQuorum within the failed
2017-07-12 06:48:10 +08:00
if ( ( ! bTooManyFailures ) & & ( logServers [ log ] - > tLogWriteAntiQuorum ) & & ( ! validateAllCombinations ( badCombo , unResponsiveSet , logServers [ log ] - > tLogPolicy , availableItems , logServers [ log ] - > tLogWriteAntiQuorum , false ) ) ) {
2017-07-10 05:46:16 +08:00
TraceEvent ( " EpochEndBadCombo " , dbgid ) . detail ( " Cycles " , cycles )
. detail ( " logNum " , log )
2017-05-26 04:48:44 +08:00
. detail ( " Required " , requiredCount )
. detail ( " Present " , results . size ( ) )
. detail ( " Available " , availableItems . size ( ) )
2017-07-12 06:48:10 +08:00
. detail ( " Absent " , logServers [ log ] - > logServers . size ( ) - results . size ( ) )
2017-05-26 04:48:44 +08:00
. detail ( " ServerState " , sServerState )
2017-07-12 06:48:10 +08:00
. detail ( " ReplicationFactor " , logServers [ log ] - > tLogReplicationFactor )
. detail ( " AntiQuorum " , logServers [ log ] - > tLogWriteAntiQuorum )
. detail ( " Policy " , logServers [ log ] - > tLogPolicy - > info ( ) )
2017-05-26 04:48:44 +08:00
. detail ( " TooManyFailures " , bTooManyFailures )
2017-07-12 06:48:10 +08:00
. detail ( " LogZones " , : : describeZones ( logServers [ log ] - > tLogLocalities ) )
. detail ( " LogDataHalls " , : : describeDataHalls ( logServers [ log ] - > tLogLocalities ) ) ;
2017-07-10 05:46:16 +08:00
bTooManyFailures = true ;
}
// If too many TLogs are failed for recovery to be possible, we could wait forever here.
//Void _ = wait( smartQuorum( tLogReply, requiredCount, SERVER_KNOBS->RECOVERY_TLOG_SMART_QUORUM_DELAY ) || rejoins );
2018-03-30 06:12:38 +08:00
ASSERT ( logServers [ log ] - > logServers . size ( ) = = lockResults [ log ] . replies . size ( ) ) ;
2017-07-10 05:46:16 +08:00
if ( ! bTooManyFailures ) {
std : : sort ( results . begin ( ) , results . end ( ) , sort_by_end ( ) ) ;
2017-07-12 06:48:10 +08:00
int absent = logServers [ log ] - > logServers . size ( ) - results . size ( ) ;
int safe_range_begin = logServers [ log ] - > tLogWriteAntiQuorum ;
int new_safe_range_begin = std : : min ( logServers [ log ] - > tLogWriteAntiQuorum , ( int ) ( results . size ( ) - 1 ) ) ;
int safe_range_end = logServers [ log ] - > tLogReplicationFactor - absent ;
2017-07-10 05:46:16 +08:00
if ( ( prevState . logSystemType = = 2 & & ( ! last_end . present ( ) | | ( ( safe_range_end > 0 ) & & ( safe_range_end - 1 < results . size ( ) ) & & results [ safe_range_end - 1 ] . end < last_end . get ( ) ) ) ) ) {
2017-07-14 03:29:21 +08:00
knownCommittedVersion = std : : max ( knownCommittedVersion , results [ new_safe_range_begin ] . end - ( g_network - > isSimulated ( ) ? 10 * SERVER_KNOBS - > VERSIONS_PER_SECOND : SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) ) ; //In simulation this must be the maximum MAX_READ_TRANSACTION_LIFE_VERSIONS
2017-07-10 05:46:16 +08:00
for ( int i = 0 ; i < results . size ( ) ; i + + ) {
knownCommittedVersion = std : : max ( knownCommittedVersion , results [ i ] . knownCommittedVersion ) ;
}
TraceEvent ( " LogSystemRecovery " , dbgid ) . detail ( " Cycles " , cycles )
. detail ( " logNum " , log )
2017-07-12 06:48:10 +08:00
. detail ( " TotalServers " , logServers [ log ] - > logServers . size ( ) )
2017-07-10 05:46:16 +08:00
. detail ( " Required " , requiredCount )
. detail ( " Present " , results . size ( ) )
. detail ( " Available " , availableItems . size ( ) )
2017-07-12 06:48:10 +08:00
. detail ( " Absent " , logServers [ log ] - > logServers . size ( ) - results . size ( ) )
2017-07-10 05:46:16 +08:00
. detail ( " ServerState " , sServerState )
2017-07-12 06:48:10 +08:00
. detail ( " ReplicationFactor " , logServers [ log ] - > tLogReplicationFactor )
. detail ( " AntiQuorum " , logServers [ log ] - > tLogWriteAntiQuorum )
. detail ( " Policy " , logServers [ log ] - > tLogPolicy - > info ( ) )
2017-07-10 05:46:16 +08:00
. detail ( " TooManyFailures " , bTooManyFailures )
. detail ( " LastVersion " , ( last_end . present ( ) ) ? last_end . get ( ) : - 1L )
. detail ( " RecoveryVersion " , ( ( safe_range_end > 0 ) & & ( safe_range_end - 1 < results . size ( ) ) ) ? results [ safe_range_end - 1 ] . end : - 1 )
. detail ( " EndVersion " , results [ new_safe_range_begin ] . end )
. detail ( " SafeBegin " , safe_range_begin )
. detail ( " SafeEnd " , safe_range_end )
. detail ( " NewSafeBegin " , new_safe_range_begin )
2017-07-12 06:48:10 +08:00
. detail ( " LogZones " , : : describeZones ( logServers [ log ] - > tLogLocalities ) )
. detail ( " LogDataHalls " , : : describeDataHalls ( logServers [ log ] - > tLogLocalities ) )
2017-07-10 05:46:16 +08:00
. detail ( " tLogs " , ( int ) prevState . tLogs . size ( ) )
. detail ( " oldTlogsSize " , ( int ) prevState . oldTLogData . size ( ) )
. detail ( " logSystemType " , prevState . logSystemType )
. detail ( " knownCommittedVersion " , knownCommittedVersion ) ;
2017-07-14 03:29:21 +08:00
if ( ! end . present ( ) | | results [ new_safe_range_begin ] . end < end . get ( ) ) {
2017-07-10 05:46:16 +08:00
end = results [ new_safe_range_begin ] . end ;
}
}
else {
TraceEvent ( " LogSystemUnchangedRecovery " , dbgid ) . detail ( " Cycles " , cycles )
. detail ( " logNum " , log )
2017-07-12 06:48:10 +08:00
. detail ( " TotalServers " , logServers [ log ] - > logServers . size ( ) )
2017-07-10 05:46:16 +08:00
. detail ( " Required " , requiredCount )
. detail ( " Present " , results . size ( ) )
. detail ( " Available " , availableItems . size ( ) )
. detail ( " ServerState " , sServerState )
2017-07-12 06:48:10 +08:00
. detail ( " ReplicationFactor " , logServers [ log ] - > tLogReplicationFactor )
. detail ( " AntiQuorum " , logServers [ log ] - > tLogWriteAntiQuorum )
. detail ( " Policy " , logServers [ log ] - > tLogPolicy - > info ( ) )
2017-07-10 05:46:16 +08:00
. detail ( " TooManyFailures " , bTooManyFailures )
. detail ( " LastVersion " , ( last_end . present ( ) ) ? last_end . get ( ) : - 1L )
. detail ( " RecoveryVersion " , ( ( safe_range_end > 0 ) & & ( safe_range_end - 1 < results . size ( ) ) ) ? results [ safe_range_end - 1 ] . end : - 1 )
. detail ( " EndVersion " , results [ new_safe_range_begin ] . end )
. detail ( " SafeBegin " , safe_range_begin )
. detail ( " SafeEnd " , safe_range_end )
. detail ( " NewSafeBegin " , new_safe_range_begin )
2017-07-12 06:48:10 +08:00
. detail ( " LogZones " , : : describeZones ( logServers [ log ] - > tLogLocalities ) )
2017-07-14 03:29:21 +08:00
. detail ( " LogDataHalls " , : : describeDataHalls ( logServers [ log ] - > tLogLocalities ) )
. detail ( " logSystemType " , prevState . logSystemType ) ;
2017-07-10 05:46:16 +08:00
}
2017-05-26 04:48:44 +08:00
}
2017-07-10 05:46:16 +08:00
// Too many failures
2017-05-26 04:48:44 +08:00
else {
2017-07-10 05:46:16 +08:00
TraceEvent ( " LogSystemWaitingForRecovery " , dbgid ) . detail ( " Cycles " , cycles )
. detail ( " logNum " , log )
. detail ( " AvailableServers " , results . size ( ) )
. detail ( " RequiredServers " , requiredCount )
2017-07-12 06:48:10 +08:00
. detail ( " TotalServers " , logServers [ log ] - > logServers . size ( ) )
2017-05-26 04:48:44 +08:00
. detail ( " Required " , requiredCount )
. detail ( " Present " , results . size ( ) )
. detail ( " Available " , availableItems . size ( ) )
. detail ( " ServerState " , sServerState )
2017-07-12 06:48:10 +08:00
. detail ( " ReplicationFactor " , logServers [ log ] - > tLogReplicationFactor )
. detail ( " AntiQuorum " , logServers [ log ] - > tLogWriteAntiQuorum )
. detail ( " Policy " , logServers [ log ] - > tLogPolicy - > info ( ) )
2017-05-26 04:48:44 +08:00
. detail ( " TooManyFailures " , bTooManyFailures )
2017-07-12 06:48:10 +08:00
. detail ( " LogZones " , : : describeZones ( logServers [ log ] - > tLogLocalities ) )
. detail ( " LogDataHalls " , : : describeDataHalls ( logServers [ log ] - > tLogLocalities ) ) ;
2017-05-26 04:48:44 +08:00
}
}
2017-07-10 05:46:16 +08:00
if ( end . present ( ) ) {
TEST ( last_end . present ( ) ) ; // Restarting recovery at an earlier point
Reference < TagPartitionedLogSystem > logSystem ( new TagPartitionedLogSystem ( dbgid , locality ) ) ;
last_end = end ;
logSystem - > tLogs = logServers ;
2018-04-09 12:24:05 +08:00
logSystem - > logRouterTags = prevState . logRouterTags ;
2017-07-10 05:46:16 +08:00
logSystem - > oldLogData = oldLogData ;
logSystem - > logSystemType = prevState . logSystemType ;
2018-03-30 06:12:38 +08:00
logSystem - > rejoins = rejoins ;
logSystem - > lockResults = lockResults ;
2017-07-10 05:46:16 +08:00
logSystem - > epochEndVersion = end . get ( ) ;
logSystem - > knownCommittedVersion = knownCommittedVersion ;
2018-04-10 02:44:54 +08:00
logSystem - > remoteLogsWrittenToCoreState = true ;
2017-07-10 05:46:16 +08:00
2018-04-09 12:24:05 +08:00
for ( int log = 0 ; log < logServers . size ( ) ; log + + ) {
if ( lockResults [ log ] . logSet - > isLocal ) {
for ( auto & r : lockResults [ log ] . replies ) {
if ( r . isReady ( ) & & ! r . isError ( ) ) {
logSystem - > epochEndTags . insert ( r . get ( ) . tags . begin ( ) , r . get ( ) . tags . end ( ) ) ;
}
2017-07-10 05:46:16 +08:00
}
}
}
outLogSystem - > set ( logSystem ) ;
2017-05-26 04:48:44 +08:00
}
// Wait for anything relevant to change
std : : vector < Future < Void > > changes ;
2017-07-10 05:46:16 +08:00
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
2017-07-12 06:48:10 +08:00
if ( logServers [ i ] - > isLocal ) {
for ( int j = 0 ; j < logServers [ i ] - > logServers . size ( ) ; j + + ) {
2018-03-30 06:12:38 +08:00
if ( ! lockResults [ i ] . replies [ j ] . isReady ( ) )
changes . push_back ( ready ( lockResults [ i ] . replies [ j ] ) ) ;
2017-07-10 05:46:16 +08:00
else {
2017-07-12 06:48:10 +08:00
changes . push_back ( logServers [ i ] - > logServers [ j ] - > onChange ( ) ) ;
2017-07-10 05:46:16 +08:00
changes . push_back ( logFailed [ i ] [ j ] - > onChange ( ) ) ;
}
}
2017-05-26 04:48:44 +08:00
}
}
ASSERT ( changes . size ( ) ) ;
Void _ = wait ( waitForAny ( changes ) ) ;
}
}
2018-04-13 06:20:54 +08:00
ACTOR static Future < Void > recruitOldLogRouters ( TagPartitionedLogSystem * self , vector < WorkerInterface > workers , LogEpoch recoveryCount , int8_t locality , Version startVersion , int logSet , bool onlyOld ) {
2018-04-09 12:24:05 +08:00
state vector < vector < Future < TLogInterface > > > logRouterInitializationReplies ;
state vector < Future < TLogInterface > > allReplies ;
int nextRouter = 0 ;
2018-04-15 10:06:24 +08:00
Version lastStart = std : : numeric_limits < Version > : : max ( ) ;
2018-04-13 06:20:54 +08:00
if ( ! onlyOld ) {
2018-04-15 10:06:24 +08:00
lastStart = std : : max ( startVersion , self - > tLogs [ 0 ] - > startVersion ) ;
2018-04-13 06:20:54 +08:00
if ( self - > logRouterTags = = 0 ) {
2018-04-23 02:54:39 +08:00
self - > logSystemConfigChanged . trigger ( ) ;
2018-04-13 06:20:54 +08:00
return Void ( ) ;
}
bool found = false ;
for ( auto & tLogs : self - > tLogs ) {
if ( tLogs - > locality = = locality ) {
found = true ;
}
2018-04-13 09:14:23 +08:00
2018-04-13 06:20:54 +08:00
tLogs - > logRouters . clear ( ) ;
}
if ( ! found ) {
Reference < LogSet > newLogSet ( new LogSet ( ) ) ;
newLogSet - > locality = locality ;
2018-04-15 10:06:24 +08:00
newLogSet - > startVersion = lastStart ;
2018-04-13 06:20:54 +08:00
newLogSet - > isLocal = false ;
self - > tLogs . push_back ( newLogSet ) ;
}
for ( auto & tLogs : self - > tLogs ) {
//Recruit log routers for old generations of the primary locality
if ( tLogs - > locality = = locality ) {
logRouterInitializationReplies . push_back ( vector < Future < TLogInterface > > ( ) ) ;
for ( int i = 0 ; i < self - > logRouterTags ; i + + ) {
InitializeLogRouterRequest req ;
req . recoveryCount = recoveryCount ;
req . routerTag = Tag ( tagLocalityLogRouter , i ) ;
req . logSet = logSet ;
2018-04-15 10:06:24 +08:00
req . startVersion = lastStart ;
2018-04-13 06:20:54 +08:00
auto reply = transformErrors ( throwErrorOr ( workers [ nextRouter ] . logRouter . getReplyUnlessFailedFor ( req , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ;
logRouterInitializationReplies . back ( ) . push_back ( reply ) ;
allReplies . push_back ( reply ) ;
nextRouter = ( nextRouter + 1 ) % workers . size ( ) ;
}
}
}
}
2018-04-09 12:24:05 +08:00
for ( auto & old : self - > oldLogData ) {
2018-04-15 10:06:24 +08:00
if ( old . logRouterTags = = 0 | | old . tLogs [ 0 ] - > startVersion > = lastStart ) {
2018-04-09 12:24:05 +08:00
break ;
2018-03-30 06:12:38 +08:00
}
2018-04-15 10:06:24 +08:00
lastStart = std : : max ( startVersion , old . tLogs [ 0 ] - > startVersion ) ;
2018-04-09 12:24:05 +08:00
bool found = false ;
for ( auto & tLogs : old . tLogs ) {
if ( tLogs - > locality = = locality ) {
2018-03-30 06:12:38 +08:00
found = true ;
2018-04-09 12:24:05 +08:00
}
2018-04-13 06:20:54 +08:00
tLogs - > logRouters . clear ( ) ;
2018-03-30 06:12:38 +08:00
}
2018-04-09 12:24:05 +08:00
if ( ! found ) {
Reference < LogSet > newLogSet ( new LogSet ( ) ) ;
newLogSet - > locality = locality ;
2018-04-15 10:06:24 +08:00
newLogSet - > startVersion = lastStart ;
2018-04-09 12:24:05 +08:00
old . tLogs . push_back ( newLogSet ) ;
2018-03-30 06:12:38 +08:00
}
for ( auto & tLogs : old . tLogs ) {
//Recruit log routers for old generations of the primary locality
2018-04-09 12:24:05 +08:00
if ( tLogs - > locality = = locality ) {
2018-03-30 06:12:38 +08:00
logRouterInitializationReplies . push_back ( vector < Future < TLogInterface > > ( ) ) ;
2018-04-09 12:24:05 +08:00
for ( int i = 0 ; i < old . logRouterTags ; i + + ) {
2018-03-30 06:12:38 +08:00
InitializeLogRouterRequest req ;
req . recoveryCount = recoveryCount ;
req . routerTag = Tag ( tagLocalityLogRouter , i ) ;
2018-04-09 12:24:05 +08:00
req . logSet = logSet ;
2018-04-15 10:06:24 +08:00
req . startVersion = lastStart ;
2018-03-30 06:12:38 +08:00
auto reply = transformErrors ( throwErrorOr ( workers [ nextRouter ] . logRouter . getReplyUnlessFailedFor ( req , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ;
logRouterInitializationReplies . back ( ) . push_back ( reply ) ;
allReplies . push_back ( reply ) ;
nextRouter = ( nextRouter + 1 ) % workers . size ( ) ;
}
}
}
}
Void _ = wait ( waitForAll ( allReplies ) ) ;
int nextReplies = 0 ;
2018-04-15 10:06:24 +08:00
Version lastStart = std : : numeric_limits < Version > : : max ( ) ;
2018-04-13 06:20:54 +08:00
if ( ! onlyOld ) {
2018-04-15 10:06:24 +08:00
lastStart = std : : max ( startVersion , self - > tLogs [ 0 ] - > startVersion ) ;
2018-04-13 06:20:54 +08:00
for ( auto & tLogs : self - > tLogs ) {
if ( tLogs - > locality = = locality ) {
for ( int i = 0 ; i < logRouterInitializationReplies [ nextReplies ] . size ( ) ; i + + ) {
tLogs - > logRouters . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( OptionalInterface < TLogInterface > ( logRouterInitializationReplies [ nextReplies ] [ i ] . get ( ) ) ) ) ) ;
}
nextReplies + + ;
}
}
}
2018-03-30 06:12:38 +08:00
for ( auto & old : self - > oldLogData ) {
2018-04-15 10:06:24 +08:00
if ( old . logRouterTags = = 0 | | old . tLogs [ 0 ] - > startVersion > = lastStart ) {
2018-04-09 12:24:05 +08:00
break ;
}
2018-04-15 10:06:24 +08:00
lastStart = std : : max ( startVersion , old . tLogs [ 0 ] - > startVersion ) ;
2018-03-30 06:12:38 +08:00
for ( auto & tLogs : old . tLogs ) {
2018-04-09 12:24:05 +08:00
if ( tLogs - > locality = = locality ) {
2018-03-30 06:12:38 +08:00
for ( int i = 0 ; i < logRouterInitializationReplies [ nextReplies ] . size ( ) ; i + + ) {
tLogs - > logRouters . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( OptionalInterface < TLogInterface > ( logRouterInitializationReplies [ nextReplies ] [ i ] . get ( ) ) ) ) ) ;
}
nextReplies + + ;
}
}
}
2018-04-23 02:54:39 +08:00
self - > logSystemConfigChanged . trigger ( ) ;
2018-03-30 06:12:38 +08:00
return Void ( ) ;
}
2018-02-20 08:49:57 +08:00
ACTOR static Future < Void > newRemoteEpoch ( TagPartitionedLogSystem * self , Reference < TagPartitionedLogSystem > oldLogSystem , Future < RecruitRemoteFromConfigurationReply > fRemoteWorkers , DatabaseConfiguration configuration , LogEpoch recoveryCount , int8_t remoteLocality )
2017-07-10 05:46:16 +08:00
{
2018-01-06 06:15:25 +08:00
TraceEvent ( " RemoteLogRecruitment_WaitingForWorkers " ) ;
2017-09-08 06:32:08 +08:00
state RecruitRemoteFromConfigurationReply remoteWorkers = wait ( fRemoteWorkers ) ;
2018-03-07 08:31:21 +08:00
2018-04-09 12:24:05 +08:00
if ( remoteWorkers . logRouters . size ( ) ! = self - > logRouterTags ) {
TraceEvent ( " RemoteLogRecruitment_MismatchedLogRouters " ) . detail ( " logRouterCount " , self - > logRouterTags ) . detail ( " workers " , remoteWorkers . logRouters . size ( ) ) ;
2018-03-07 08:31:21 +08:00
throw master_recovery_failed ( ) ;
}
2018-03-30 06:12:38 +08:00
2017-09-08 06:32:08 +08:00
state Reference < LogSet > logSet = Reference < LogSet > ( new LogSet ( ) ) ;
logSet - > tLogReplicationFactor = configuration . remoteTLogReplicationFactor ;
logSet - > tLogPolicy = configuration . remoteTLogPolicy ;
logSet - > isLocal = false ;
2018-01-30 09:48:18 +08:00
logSet - > hasBestPolicy = HasBestPolicyId ;
2017-09-08 06:32:08 +08:00
logSet - > locality = remoteLocality ;
2018-03-30 06:12:38 +08:00
2018-04-10 12:58:14 +08:00
logSet - > startVersion = oldLogSystem - > knownCommittedVersion + 1 ;
2018-04-09 12:24:05 +08:00
state int lockNum = 0 ;
while ( lockNum < oldLogSystem - > lockResults . size ( ) ) {
if ( oldLogSystem - > lockResults [ lockNum ] . logSet - > locality = = remoteLocality ) {
std : : pair < Version , Version > versions = wait ( TagPartitionedLogSystem : : getDurableVersion ( self - > dbgid , oldLogSystem - > lockResults [ lockNum ] ) ) ;
2018-04-22 07:03:28 +08:00
logSet - > startVersion = std : : min ( versions . first , logSet - > startVersion ) ;
2018-04-09 12:24:05 +08:00
break ;
2018-03-30 06:12:38 +08:00
}
2018-04-09 12:24:05 +08:00
lockNum + + ;
2018-03-30 06:12:38 +08:00
}
state Future < Void > oldRouterRecruitment = Void ( ) ;
2018-04-10 12:58:14 +08:00
if ( logSet - > startVersion < oldLogSystem - > knownCommittedVersion + 1 ) {
2018-04-13 06:20:54 +08:00
oldRouterRecruitment = TagPartitionedLogSystem : : recruitOldLogRouters ( self , remoteWorkers . logRouters , recoveryCount , remoteLocality , logSet - > startVersion , self - > tLogs . size ( ) , true ) ;
2018-03-30 06:12:38 +08:00
}
2017-07-10 05:46:16 +08:00
state vector < Future < TLogInterface > > logRouterInitializationReplies ;
2018-02-20 08:49:57 +08:00
for ( int i = 0 ; i < remoteWorkers . logRouters . size ( ) ; i + + ) {
2017-07-10 05:46:16 +08:00
InitializeLogRouterRequest req ;
req . recoveryCount = recoveryCount ;
2017-08-04 07:16:36 +08:00
req . routerTag = Tag ( tagLocalityLogRouter , i ) ;
2017-09-08 06:32:08 +08:00
req . logSet = self - > tLogs . size ( ) ;
2018-04-16 05:54:22 +08:00
req . startVersion = std : : max ( self - > tLogs [ 0 ] - > startVersion , logSet - > startVersion ) ;
2018-02-20 08:49:57 +08:00
logRouterInitializationReplies . push_back ( transformErrors ( throwErrorOr ( remoteWorkers . logRouters [ i ] . logRouter . getReplyUnlessFailedFor ( req , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
2017-07-10 05:46:16 +08:00
}
2018-04-09 12:24:05 +08:00
TraceEvent ( " RemoteLogRecruitment_RecruitingLogRouters " ) . detail ( " startVersion " , logSet - > startVersion ) ;
2018-03-30 06:12:38 +08:00
Void _ = wait ( waitForAll ( logRouterInitializationReplies ) & & oldRouterRecruitment ) ;
2017-07-10 05:46:16 +08:00
for ( int i = 0 ; i < logRouterInitializationReplies . size ( ) ; i + + ) {
2017-09-08 06:32:08 +08:00
logSet - > logRouters . push_back ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( OptionalInterface < TLogInterface > ( logRouterInitializationReplies [ i ] . get ( ) ) ) ) ) ;
2017-07-10 05:46:16 +08:00
}
state double startTime = now ( ) ;
state vector < Future < TLogInterface > > remoteTLogInitializationReplies ;
2017-09-08 06:32:08 +08:00
vector < InitializeTLogRequest > remoteTLogReqs ( remoteWorkers . remoteTLogs . size ( ) ) ;
2017-07-10 05:46:16 +08:00
2018-04-01 07:47:56 +08:00
std : : vector < Tag > allTags ( self - > epochEndTags . begin ( ) , self - > epochEndTags . end ( ) ) ;
2017-09-08 06:32:08 +08:00
for ( int i = 0 ; i < remoteWorkers . remoteTLogs . size ( ) ; i + + ) {
2017-07-10 05:46:16 +08:00
InitializeTLogRequest & req = remoteTLogReqs [ i ] ;
2018-04-21 04:25:22 +08:00
req . recruitmentID = self - > recruitmentID ;
2017-07-10 05:46:16 +08:00
req . storeType = configuration . tLogDataStoreType ;
2017-07-16 06:15:03 +08:00
req . recoverFrom = oldLogSystem - > getLogSystemConfig ( ) ;
req . recoverAt = oldLogSystem - > epochEndVersion . get ( ) ;
req . knownCommittedVersion = oldLogSystem - > knownCommittedVersion ;
2017-07-10 05:46:16 +08:00
req . epoch = recoveryCount ;
2017-08-04 07:16:36 +08:00
req . remoteTag = Tag ( tagLocalityRemoteLog , i ) ;
2018-03-30 06:12:38 +08:00
req . locality = remoteLocality ;
req . isPrimary = false ;
2018-04-01 07:47:56 +08:00
req . allTags = allTags ;
2018-04-09 12:24:05 +08:00
req . startVersion = logSet - > startVersion ;
req . logRouterTags = 0 ;
2017-07-10 05:46:16 +08:00
}
2017-09-08 06:32:08 +08:00
logSet - > tLogLocalities . resize ( remoteWorkers . remoteTLogs . size ( ) ) ;
logSet - > logServers . resize ( remoteWorkers . remoteTLogs . size ( ) ) ; // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSet - > updateLocalitySet ( remoteWorkers . remoteTLogs ) ;
2018-01-06 03:33:42 +08:00
filterLocalityDataForPolicy ( logSet - > tLogPolicy , & logSet - > tLogLocalities ) ;
2017-07-10 05:46:16 +08:00
2017-09-08 06:32:08 +08:00
for ( int i = 0 ; i < remoteWorkers . remoteTLogs . size ( ) ; i + + )
remoteTLogInitializationReplies . push_back ( transformErrors ( throwErrorOr ( remoteWorkers . remoteTLogs [ i ] . tLog . getReplyUnlessFailedFor ( remoteTLogReqs [ i ] , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
2017-07-10 05:46:16 +08:00
2018-01-06 06:15:25 +08:00
TraceEvent ( " RemoteLogRecruitment_InitializingRemoteLogs " ) ;
2017-07-10 05:46:16 +08:00
Void _ = wait ( waitForAll ( remoteTLogInitializationReplies ) ) ;
for ( int i = 0 ; i < remoteTLogInitializationReplies . size ( ) ; i + + ) {
2017-09-08 06:32:08 +08:00
logSet - > logServers [ i ] = Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( OptionalInterface < TLogInterface > ( remoteTLogInitializationReplies [ i ] . get ( ) ) ) ) ;
logSet - > tLogLocalities [ i ] = remoteWorkers . remoteTLogs [ i ] . locality ;
2017-07-10 05:46:16 +08:00
}
std : : vector < Future < Void > > recoveryComplete ;
2017-09-08 06:32:08 +08:00
for ( int i = 0 ; i < logSet - > logServers . size ( ) ; i + + )
recoveryComplete . push_back ( transformErrors ( throwErrorOr ( logSet - > logServers [ i ] - > get ( ) . interf ( ) . recoveryFinished . getReplyUnlessFailedFor ( TLogRecoveryFinishedRequest ( ) , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
2017-07-10 05:46:16 +08:00
2017-07-14 03:29:21 +08:00
self - > remoteRecoveryComplete = waitForAll ( recoveryComplete ) ;
2017-09-08 06:32:08 +08:00
self - > tLogs . push_back ( logSet ) ;
2018-01-06 06:15:25 +08:00
TraceEvent ( " RemoteLogRecruitment_CompletingRecovery " ) ;
2017-07-10 05:46:16 +08:00
return Void ( ) ;
}
2017-09-08 06:32:08 +08:00
ACTOR static Future < Reference < ILogSystem > > newEpoch ( Reference < TagPartitionedLogSystem > oldLogSystem , RecruitFromConfigurationReply recr , Future < RecruitRemoteFromConfigurationReply > fRemoteWorkers , DatabaseConfiguration configuration , LogEpoch recoveryCount , int8_t primaryLocality , int8_t remoteLocality )
2017-05-26 04:48:44 +08:00
{
state double startTime = now ( ) ;
state Reference < TagPartitionedLogSystem > logSystem ( new TagPartitionedLogSystem ( oldLogSystem - > getDebugID ( ) , oldLogSystem - > locality ) ) ;
logSystem - > logSystemType = 2 ;
2017-09-08 06:32:08 +08:00
logSystem - > expectedLogSets = 1 ;
2018-03-31 10:08:01 +08:00
logSystem - > epochEndTags = oldLogSystem - > getEpochEndTags ( ) ;
2018-04-21 04:25:22 +08:00
logSystem - > recruitmentID = g_random - > randomUniqueID ( ) ;
oldLogSystem - > recruitmentID = logSystem - > recruitmentID ;
2017-07-10 05:46:16 +08:00
2017-09-08 06:32:08 +08:00
logSystem - > tLogs . push_back ( Reference < LogSet > ( new LogSet ( ) ) ) ;
2017-07-12 06:48:10 +08:00
logSystem - > tLogs [ 0 ] - > tLogWriteAntiQuorum = configuration . tLogWriteAntiQuorum ;
logSystem - > tLogs [ 0 ] - > tLogReplicationFactor = configuration . tLogReplicationFactor ;
logSystem - > tLogs [ 0 ] - > tLogPolicy = configuration . tLogPolicy ;
logSystem - > tLogs [ 0 ] - > isLocal = true ;
2018-01-30 09:48:18 +08:00
logSystem - > tLogs [ 0 ] - > hasBestPolicy = HasBestPolicyId ;
2017-09-08 06:32:08 +08:00
logSystem - > tLogs [ 0 ] - > locality = primaryLocality ;
2018-03-30 06:12:38 +08:00
state RegionInfo region = configuration . getRegion ( recr . dcId ) ;
2018-03-06 11:27:46 +08:00
if ( region . satelliteTLogReplicationFactor > 0 ) {
2017-09-08 06:32:08 +08:00
logSystem - > tLogs . push_back ( Reference < LogSet > ( new LogSet ( ) ) ) ;
2018-03-06 11:27:46 +08:00
logSystem - > tLogs [ 1 ] - > tLogWriteAntiQuorum = region . satelliteTLogWriteAntiQuorum ;
logSystem - > tLogs [ 1 ] - > tLogReplicationFactor = region . satelliteTLogReplicationFactor ;
logSystem - > tLogs [ 1 ] - > tLogPolicy = region . satelliteTLogPolicy ;
2017-09-08 06:32:08 +08:00
logSystem - > tLogs [ 1 ] - > isLocal = true ;
2018-01-30 09:48:18 +08:00
logSystem - > tLogs [ 1 ] - > hasBestPolicy = HasBestPolicyNone ;
2018-03-30 06:12:38 +08:00
logSystem - > tLogs [ 1 ] - > locality = tagLocalityInvalid ;
2018-04-10 12:58:14 +08:00
logSystem - > tLogs [ 1 ] - > startVersion = oldLogSystem - > knownCommittedVersion + 1 ;
2017-09-08 06:32:08 +08:00
logSystem - > expectedLogSets + + ;
}
2017-07-10 05:46:16 +08:00
2017-09-08 06:32:08 +08:00
if ( configuration . remoteTLogReplicationFactor > 0 ) {
2018-04-09 12:24:05 +08:00
logSystem - > logRouterTags = recr . tLogs . size ( ) ;
2017-09-08 06:32:08 +08:00
logSystem - > expectedLogSets + + ;
2018-02-14 09:01:34 +08:00
} else {
2018-04-09 12:24:05 +08:00
logSystem - > logRouterTags = 0 ;
2017-09-08 06:32:08 +08:00
}
2017-07-10 05:46:16 +08:00
if ( oldLogSystem - > tLogs . size ( ) ) {
2017-05-26 04:48:44 +08:00
logSystem - > oldLogData . push_back ( OldLogData ( ) ) ;
2017-07-10 05:46:16 +08:00
logSystem - > oldLogData [ 0 ] . tLogs = oldLogSystem - > tLogs ;
2017-05-26 04:48:44 +08:00
logSystem - > oldLogData [ 0 ] . epochEnd = oldLogSystem - > knownCommittedVersion + 1 ;
2018-04-13 06:20:54 +08:00
logSystem - > oldLogData [ 0 ] . logRouterTags = oldLogSystem - > logRouterTags ;
2017-05-26 04:48:44 +08:00
}
for ( int i = 0 ; i < oldLogSystem - > oldLogData . size ( ) ; i + + ) {
logSystem - > oldLogData . push_back ( oldLogSystem - > oldLogData [ i ] ) ;
}
2018-04-10 12:58:14 +08:00
logSystem - > tLogs [ 0 ] - > startVersion = oldLogSystem - > knownCommittedVersion + 1 ;
2018-04-09 12:24:05 +08:00
state int lockNum = 0 ;
while ( lockNum < oldLogSystem - > lockResults . size ( ) ) {
if ( oldLogSystem - > lockResults [ lockNum ] . logSet - > locality = = primaryLocality ) {
2018-04-23 02:14:13 +08:00
if ( oldLogSystem - > lockResults [ lockNum ] . isCurrent & & oldLogSystem - > lockResults [ lockNum ] . logSet - > isLocal ) {
break ;
}
2018-04-09 12:24:05 +08:00
std : : pair < Version , Version > versions = wait ( TagPartitionedLogSystem : : getDurableVersion ( logSystem - > dbgid , oldLogSystem - > lockResults [ lockNum ] ) ) ;
2018-04-22 07:03:28 +08:00
logSystem - > tLogs [ 0 ] - > startVersion = std : : min ( versions . first , logSystem - > tLogs [ 0 ] - > startVersion ) ;
2018-04-09 12:24:05 +08:00
break ;
2018-03-30 06:12:38 +08:00
}
2018-04-09 12:24:05 +08:00
lockNum + + ;
2018-03-30 06:12:38 +08:00
}
2018-04-09 12:24:05 +08:00
2018-03-30 06:12:38 +08:00
state Future < Void > oldRouterRecruitment = Void ( ) ;
2018-04-10 12:58:14 +08:00
if ( logSystem - > tLogs [ 0 ] - > startVersion < oldLogSystem - > knownCommittedVersion + 1 ) {
2018-04-13 06:20:54 +08:00
oldRouterRecruitment = TagPartitionedLogSystem : : recruitOldLogRouters ( oldLogSystem . getPtr ( ) , recr . oldLogRouters , recoveryCount , primaryLocality , logSystem - > tLogs [ 0 ] - > startVersion , 0 , false ) ;
2018-04-23 02:54:39 +08:00
} else {
oldLogSystem - > logSystemConfigChanged . trigger ( ) ;
2018-03-30 06:12:38 +08:00
}
2017-05-26 04:48:44 +08:00
state vector < Future < TLogInterface > > initializationReplies ;
2017-09-08 06:32:08 +08:00
vector < InitializeTLogRequest > reqs ( recr . tLogs . size ( ) ) ;
2017-05-26 04:48:44 +08:00
2018-04-01 07:47:56 +08:00
std : : vector < Tag > allTags ( logSystem - > epochEndTags . begin ( ) , logSystem - > epochEndTags . end ( ) ) ;
2017-09-08 06:32:08 +08:00
for ( int i = 0 ; i < recr . tLogs . size ( ) ; i + + ) {
2017-05-26 04:48:44 +08:00
InitializeTLogRequest & req = reqs [ i ] ;
2018-04-21 04:25:22 +08:00
req . recruitmentID = logSystem - > recruitmentID ;
2017-05-26 04:48:44 +08:00
req . storeType = configuration . tLogDataStoreType ;
req . recoverFrom = oldLogSystem - > getLogSystemConfig ( ) ;
req . recoverAt = oldLogSystem - > epochEndVersion . get ( ) ;
req . knownCommittedVersion = oldLogSystem - > knownCommittedVersion ;
req . epoch = recoveryCount ;
2018-03-30 06:12:38 +08:00
req . locality = primaryLocality ;
req . remoteTag = Tag ( tagLocalityRemoteLog , i ) ;
req . isPrimary = true ;
2018-04-01 07:47:56 +08:00
req . allTags = allTags ;
2018-04-09 12:24:05 +08:00
req . startVersion = logSystem - > tLogs [ 0 ] - > startVersion ;
req . logRouterTags = logSystem - > logRouterTags ;
2017-05-26 04:48:44 +08:00
}
2017-09-08 06:32:08 +08:00
logSystem - > tLogs [ 0 ] - > tLogLocalities . resize ( recr . tLogs . size ( ) ) ;
logSystem - > tLogs [ 0 ] - > logServers . resize ( recr . tLogs . size ( ) ) ; // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem - > tLogs [ 0 ] - > updateLocalitySet ( recr . tLogs ) ;
2018-01-06 03:33:42 +08:00
filterLocalityDataForPolicy ( logSystem - > tLogs [ 0 ] - > tLogPolicy , & logSystem - > tLogs [ 0 ] - > tLogLocalities ) ;
2017-05-26 04:48:44 +08:00
std : : vector < int > locations ;
for ( Tag tag : oldLogSystem - > getEpochEndTags ( ) ) {
2018-04-10 01:48:57 +08:00
locations . clear ( ) ;
logSystem - > tLogs [ 0 ] - > getPushLocations ( vector < Tag > ( 1 , tag ) , locations , 0 ) ;
for ( int loc : locations )
reqs [ loc ] . recoverTags . push_back ( tag ) ;
2017-05-26 04:48:44 +08:00
}
2017-09-08 06:32:08 +08:00
for ( int i = 0 ; i < recr . tLogs . size ( ) ; i + + )
initializationReplies . push_back ( transformErrors ( throwErrorOr ( recr . tLogs [ i ] . tLog . getReplyUnlessFailedFor ( reqs [ i ] , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
state std : : vector < Future < Void > > recoveryComplete ;
2018-03-06 11:27:46 +08:00
if ( region . satelliteTLogReplicationFactor > 0 ) {
2017-09-08 06:32:08 +08:00
state vector < Future < TLogInterface > > satelliteInitializationReplies ;
vector < InitializeTLogRequest > sreqs ( recr . satelliteTLogs . size ( ) ) ;
for ( int i = 0 ; i < recr . satelliteTLogs . size ( ) ; i + + ) {
InitializeTLogRequest & req = sreqs [ i ] ;
2018-04-21 04:25:22 +08:00
req . recruitmentID = logSystem - > recruitmentID ;
2017-09-08 06:32:08 +08:00
req . storeType = configuration . tLogDataStoreType ;
req . recoverFrom = oldLogSystem - > getLogSystemConfig ( ) ;
req . recoverAt = oldLogSystem - > epochEndVersion . get ( ) ;
req . knownCommittedVersion = oldLogSystem - > knownCommittedVersion ;
req . epoch = recoveryCount ;
2018-03-30 06:12:38 +08:00
req . locality = tagLocalityInvalid ;
req . remoteTag = Tag ( ) ;
req . isPrimary = true ;
2018-04-01 07:47:56 +08:00
req . allTags = allTags ;
2018-04-10 12:58:14 +08:00
req . startVersion = oldLogSystem - > knownCommittedVersion + 1 ;
2018-04-09 12:24:05 +08:00
req . logRouterTags = logSystem - > logRouterTags ;
2017-09-08 06:32:08 +08:00
}
logSystem - > tLogs [ 1 ] - > tLogLocalities . resize ( recr . satelliteTLogs . size ( ) ) ;
logSystem - > tLogs [ 1 ] - > logServers . resize ( recr . satelliteTLogs . size ( ) ) ; // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem - > tLogs [ 1 ] - > updateLocalitySet ( recr . satelliteTLogs ) ;
2018-01-06 03:33:42 +08:00
filterLocalityDataForPolicy ( logSystem - > tLogs [ 1 ] - > tLogPolicy , & logSystem - > tLogs [ 1 ] - > tLogLocalities ) ;
2017-09-08 06:32:08 +08:00
for ( Tag tag : oldLogSystem - > getEpochEndTags ( ) ) {
2018-04-10 01:48:57 +08:00
locations . clear ( ) ;
logSystem - > tLogs [ 1 ] - > getPushLocations ( vector < Tag > ( 1 , tag ) , locations , 0 ) ;
for ( int loc : locations )
sreqs [ loc ] . recoverTags . push_back ( tag ) ;
2017-09-08 06:32:08 +08:00
}
for ( int i = 0 ; i < recr . satelliteTLogs . size ( ) ; i + + )
satelliteInitializationReplies . push_back ( transformErrors ( throwErrorOr ( recr . satelliteTLogs [ i ] . tLog . getReplyUnlessFailedFor ( sreqs [ i ] , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
Void _ = wait ( waitForAll ( satelliteInitializationReplies ) ) ;
for ( int i = 0 ; i < satelliteInitializationReplies . size ( ) ; i + + ) {
logSystem - > tLogs [ 1 ] - > logServers [ i ] = Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( OptionalInterface < TLogInterface > ( satelliteInitializationReplies [ i ] . get ( ) ) ) ) ;
logSystem - > tLogs [ 1 ] - > tLogLocalities [ i ] = recr . satelliteTLogs [ i ] . locality ;
}
for ( int i = 0 ; i < logSystem - > tLogs [ 1 ] - > logServers . size ( ) ; i + + )
recoveryComplete . push_back ( transformErrors ( throwErrorOr ( logSystem - > tLogs [ 1 ] - > logServers [ i ] - > get ( ) . interf ( ) . recoveryFinished . getReplyUnlessFailedFor ( TLogRecoveryFinishedRequest ( ) , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
}
2017-05-26 04:48:44 +08:00
2018-03-30 06:12:38 +08:00
Void _ = wait ( waitForAll ( initializationReplies ) & & oldRouterRecruitment ) ;
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < initializationReplies . size ( ) ; i + + ) {
2017-07-12 06:48:10 +08:00
logSystem - > tLogs [ 0 ] - > logServers [ i ] = Reference < AsyncVar < OptionalInterface < TLogInterface > > > ( new AsyncVar < OptionalInterface < TLogInterface > > ( OptionalInterface < TLogInterface > ( initializationReplies [ i ] . get ( ) ) ) ) ;
2017-09-08 06:32:08 +08:00
logSystem - > tLogs [ 0 ] - > tLogLocalities [ i ] = recr . tLogs [ i ] . locality ;
2017-05-26 04:48:44 +08:00
}
2018-01-06 03:33:42 +08:00
filterLocalityDataForPolicy ( logSystem - > tLogs [ 0 ] - > tLogPolicy , & logSystem - > tLogs [ 0 ] - > tLogLocalities ) ;
2017-05-26 04:48:44 +08:00
//Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout
if ( BUGGIFY & & now ( ) - startTime < 300 & & g_network - > isSimulated ( ) & & g_simulator . speedUpSimulation ) throw master_recovery_failed ( ) ;
2017-07-12 06:48:10 +08:00
for ( int i = 0 ; i < logSystem - > tLogs [ 0 ] - > logServers . size ( ) ; i + + )
recoveryComplete . push_back ( transformErrors ( throwErrorOr ( logSystem - > tLogs [ 0 ] - > logServers [ i ] - > get ( ) . interf ( ) . recoveryFinished . getReplyUnlessFailedFor ( TLogRecoveryFinishedRequest ( ) , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
2017-05-26 04:48:44 +08:00
logSystem - > recoveryComplete = waitForAll ( recoveryComplete ) ;
2017-09-08 06:32:08 +08:00
if ( configuration . remoteTLogReplicationFactor > 0 ) {
2018-02-03 03:46:04 +08:00
logSystem - > hasRemoteServers = true ;
2018-02-20 08:49:57 +08:00
logSystem - > remoteRecovery = TagPartitionedLogSystem : : newRemoteEpoch ( logSystem . getPtr ( ) , oldLogSystem , fRemoteWorkers , configuration , recoveryCount , remoteLocality ) ;
2018-01-17 10:12:40 +08:00
} else {
2018-02-03 03:46:04 +08:00
logSystem - > hasRemoteServers = false ;
2018-01-18 09:03:17 +08:00
logSystem - > remoteRecovery = logSystem - > recoveryComplete ;
logSystem - > remoteRecoveryComplete = logSystem - > recoveryComplete ;
2017-09-08 06:32:08 +08:00
}
2017-05-26 04:48:44 +08:00
return logSystem ;
}
ACTOR static Future < Void > trackRejoins ( UID dbgid , std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > logServers , FutureStream < struct TLogRejoinRequest > rejoinRequests ) {
state std : : map < UID , ReplyPromise < bool > > lastReply ;
try {
loop {
TLogRejoinRequest req = waitNext ( rejoinRequests ) ;
int pos = - 1 ;
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
if ( logServers [ i ] - > get ( ) . id ( ) = = req . myInterface . id ( ) ) {
pos = i ;
break ;
}
}
if ( pos ! = - 1 ) {
TraceEvent ( " TLogJoinedMe " , dbgid ) . detail ( " TLog " , req . myInterface . id ( ) ) . detail ( " Address " , req . myInterface . commit . getEndpoint ( ) . address . toString ( ) ) ;
if ( ! logServers [ pos ] - > get ( ) . present ( ) | | req . myInterface . commit . getEndpoint ( ) ! = logServers [ pos ] - > get ( ) . interf ( ) . commit . getEndpoint ( ) )
logServers [ pos ] - > setUnconditional ( OptionalInterface < TLogInterface > ( req . myInterface ) ) ;
lastReply [ req . myInterface . id ( ) ] . send ( false ) ;
lastReply [ req . myInterface . id ( ) ] = req . reply ;
}
else {
TraceEvent ( " TLogJoinedMeUnknown " , dbgid ) . detail ( " TLog " , req . myInterface . id ( ) ) . detail ( " Address " , req . myInterface . commit . getEndpoint ( ) . address . toString ( ) ) ;
req . reply . send ( true ) ;
}
}
} catch ( . . . ) {
for ( auto it = lastReply . begin ( ) ; it ! = lastReply . end ( ) ; + + it )
it - > second . send ( true ) ;
throw ;
}
}
ACTOR static Future < TLogLockResult > lockTLog ( UID myID , Reference < AsyncVar < OptionalInterface < TLogInterface > > > tlog ) {
TraceEvent ( " TLogLockStarted " , myID ) . detail ( " TLog " , tlog - > get ( ) . id ( ) ) ;
loop {
choose {
when ( TLogLockResult data = wait ( tlog - > get ( ) . present ( ) ? brokenPromiseToNever ( tlog - > get ( ) . interf ( ) . lock . getReply < TLogLockResult > ( ) ) : Never ( ) ) ) {
TraceEvent ( " TLogLocked " , myID ) . detail ( " TLog " , tlog - > get ( ) . id ( ) ) . detail ( " end " , data . end ) ;
return data ;
}
when ( Void _ = wait ( tlog - > onChange ( ) ) ) { }
}
}
}
2017-10-06 08:09:44 +08:00
//FIXME: disabled during merge, update and use in epochEnd()
/*
2017-08-29 04:47:35 +08:00
static void lockMinimalTLogSet ( const UID & dbgid , const DBCoreState & prevState ,
const std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > & logServers ,
const std : : vector < Reference < AsyncVar < bool > > > & logFailed ,
vector < Future < TLogLockResult > > * tLogReply ) {
// Invariant: tLogReply[i] must correspond to the tlog stored as logServers[i].
ASSERT ( tLogReply - > size ( ) = = prevState . tLogLocalities . size ( ) ) ;
ASSERT ( logFailed . size ( ) = = tLogReply - > size ( ) ) ;
// For any given index, only one of the following will be true.
auto locking_completed = [ & logFailed , tLogReply ] ( int index ) {
const auto & entry = tLogReply - > at ( index ) ;
2017-09-26 09:14:40 +08:00
return ! logFailed [ index ] - > get ( ) & & entry . isValid ( ) & & entry . isReady ( ) & & ! entry . isError ( ) ;
2017-08-29 04:47:35 +08:00
} ;
auto locking_failed = [ & logFailed , tLogReply ] ( int index ) {
const auto & entry = tLogReply - > at ( index ) ;
2017-09-26 09:14:40 +08:00
return logFailed [ index ] - > get ( ) | | ( entry . isValid ( ) & & entry . isReady ( ) & & entry . isError ( ) ) ;
2017-08-29 04:47:35 +08:00
} ;
auto locking_pending = [ & logFailed , tLogReply ] ( int index ) {
const auto & entry = tLogReply - > at ( index ) ;
return ! logFailed [ index ] - > get ( ) & & ( entry . isValid ( ) & & ! entry . isReady ( ) ) ;
} ;
auto locking_skipped = [ & logFailed , tLogReply ] ( int index ) {
const auto & entry = tLogReply - > at ( index ) ;
return ! logFailed [ index ] - > get ( ) & & ! entry . isValid ( ) ;
} ;
auto can_obtain_quorum = [ & prevState ] ( std : : function < bool ( int ) > filter ) {
LocalityGroup filter_true ;
std : : vector < LocalityData > filter_false , unused ;
for ( int i = 0 ; i < prevState . tLogLocalities . size ( ) ; i + + ) {
if ( filter ( i ) ) {
filter_true . add ( prevState . tLogLocalities [ i ] ) ;
} else {
filter_false . push_back ( prevState . tLogLocalities [ i ] ) ;
}
}
bool valid = filter_true . validate ( prevState . tLogPolicy ) ;
2017-09-23 07:19:16 +08:00
if ( ! valid & & prevState . tLogWriteAntiQuorum > 0 ) {
2017-08-29 04:47:35 +08:00
valid = ! validateAllCombinations ( unused , filter_true , prevState . tLogPolicy , filter_false , prevState . tLogWriteAntiQuorum , false ) ;
}
return valid ;
} ;
2017-09-23 06:07:57 +08:00
// Step 1: Verify that if all the failed TLogs come back, they can't form a quorum.
2017-08-29 04:47:35 +08:00
if ( can_obtain_quorum ( locking_failed ) ) {
TraceEvent ( SevInfo , " MasterRecoveryTLogLockingImpossible " , dbgid ) ;
return ;
}
2017-09-23 06:07:57 +08:00
// Step 2: It's possible for us to succeed, but we need to lock additional logs.
2017-08-29 04:47:35 +08:00
//
// First, we need an accurate picture of what TLogs we're capable of locking. We can't tell the
// difference between a temporarily failed TLog and a permanently failed TLog. Thus, we assume
// all failures are permanent, and manually re-issue lock requests if they rejoin.
for ( int i = 0 ; i < logFailed . size ( ) ; i + + ) {
const auto & r = tLogReply - > at ( i ) ;
TEST ( locking_failed ( i ) & & ( r . isValid ( ) & & ! r . isReady ( ) ) ) ; // A TLog failed with a pending request.
// The reboot_a_tlog BUGGIFY below should cause the above case to be hit.
if ( locking_failed ( i ) ) {
tLogReply - > at ( i ) = Future < TLogLockResult > ( ) ;
}
}
// We're trying to paritition the set of old tlogs into two sets, L and R, such that:
// (1). R does not validate the policy
// (2). |R| is as large as possible
// (3). L contains all the already-locked TLogs
// and then we only issue lock requests to TLogs in L. This is safe, as R does not have quorum,
// so no commits may occur. It does not matter if L forms a quorum or not.
//
// We form these sets by starting with L as all machines and R as the empty set, and moving a
// random machine from L to R until (1) or (2) no longer holds as true. Code-wise, L is
// [0..end-can_omit), and R is [end-can_omit..end), and we move a random machine via randomizing
// the order of the tlogs. Choosing a random machine was verified to generate a good-enough
// result to be interesting intests sufficiently frequently that we don't need to try to
// calculate the exact optimal solution.
std : : vector < std : : pair < LocalityData , int > > tlogs ;
for ( int i = 0 ; i < prevState . tLogLocalities . size ( ) ; i + + ) {
tlogs . emplace_back ( prevState . tLogLocalities [ i ] , i ) ;
}
g_random - > randomShuffle ( tlogs ) ;
// Rearrange the array such that things that the left is logs closer to being locked, and
// the right is logs that can't be locked. This makes us prefer locking already-locked TLogs,
// which is how we respect the decisions made in the previous execution.
auto idx_to_order = [ & locking_completed , & locking_failed , & locking_pending , & locking_skipped ] ( int index ) {
2017-09-26 09:14:40 +08:00
bool complete = locking_completed ( index ) ;
bool pending = locking_pending ( index ) ;
bool skipped = locking_skipped ( index ) ;
bool failed = locking_failed ( index ) ;
ASSERT ( complete + pending + skipped + failed = = 1 ) ;
if ( complete ) return 0 ;
if ( pending ) return 1 ;
if ( skipped ) return 2 ;
if ( failed ) return 3 ;
2017-08-29 04:47:35 +08:00
ASSERT ( false ) ; // Programmer error.
return - 1 ;
} ;
std : : sort ( tlogs . begin ( ) , tlogs . end ( ) ,
// TODO: Change long type to `auto` once toolchain supports C++17.
[ & idx_to_order ] ( const std : : pair < LocalityData , int > & lhs , const std : : pair < LocalityData , int > & rhs ) {
return idx_to_order ( lhs . second ) < idx_to_order ( rhs . second ) ;
} ) ;
// Indexes that aren't in the vector are the ones we're considering omitting. Remove indexes until
// the removed set forms a quorum.
int can_omit = 0 ;
std : : vector < int > to_lock_indexes ;
for ( auto it = tlogs . cbegin ( ) ; it ! = tlogs . cend ( ) - 1 ; it + + ) {
to_lock_indexes . push_back ( it - > second ) ;
}
auto filter = [ & to_lock_indexes ] ( int index ) {
return std : : find ( to_lock_indexes . cbegin ( ) , to_lock_indexes . cend ( ) , index ) = = to_lock_indexes . cend ( ) ;
} ;
while ( true ) {
if ( can_obtain_quorum ( filter ) ) {
break ;
} else {
can_omit + + ;
ASSERT ( can_omit < tlogs . size ( ) ) ;
to_lock_indexes . pop_back ( ) ;
}
}
2017-09-23 07:19:16 +08:00
if ( prevState . tLogReplicationFactor - prevState . tLogWriteAntiQuorum = = 1 ) {
2017-08-29 04:47:35 +08:00
ASSERT ( can_omit = = 0 ) ;
}
// Our previous check of making sure there aren't too many failed logs should have prevented this.
ASSERT ( ! locking_failed ( tlogs [ tlogs . size ( ) - can_omit - 1 ] . second ) ) ;
// If we've managed to leave more tlogs unlocked than (RF-AQ), it means we've hit the case
// where the policy engine has allowed us to have multiple logs in the same failure domain
// with independant sets of data. This case will validated that no code is relying on the old
// quorum=(RF-AQ) logic, and now goes through the policy engine instead.
TEST ( can_omit > = prevState . tLogReplicationFactor - prevState . tLogWriteAntiQuorum ) ; // Locking a subset of the TLogs while ending an epoch.
2017-09-19 03:46:29 +08:00
const bool reboot_a_tlog = g_network - > now ( ) - g_simulator . lastConnectionFailure > g_simulator . connectionFailuresDisableDuration & & BUGGIFY & & g_random - > random01 ( ) < 0.25 ;
2017-08-29 04:47:35 +08:00
TraceEvent ( SevInfo , " MasterRecoveryTLogLocking " , dbgid )
. detail ( " locks " , tlogs . size ( ) - can_omit )
. detail ( " skipped " , can_omit )
. detail ( " replication " , prevState . tLogReplicationFactor )
. detail ( " antiquorum " , prevState . tLogWriteAntiQuorum )
. detail ( " reboot_buggify " , reboot_a_tlog ) ;
for ( int i = 0 ; i < tlogs . size ( ) - can_omit ; i + + ) {
const int index = tlogs [ i ] . second ;
Future < TLogLockResult > & entry = tLogReply - > at ( index ) ;
if ( ! entry . isValid ( ) ) {
entry = lockTLog ( dbgid , logServers [ index ] ) ;
}
}
if ( reboot_a_tlog ) {
2017-09-19 03:46:29 +08:00
g_simulator . lastConnectionFailure = g_network - > now ( ) ;
2017-08-29 04:47:35 +08:00
for ( int i = 0 ; i < tlogs . size ( ) - can_omit ; i + + ) {
const int index = tlogs [ i ] . second ;
if ( logServers [ index ] - > get ( ) . present ( ) ) {
g_simulator . rebootProcess (
g_simulator . getProcessByAddress (
logServers [ index ] - > get ( ) . interf ( ) . address ( ) ) ,
ISimulator : : RebootProcess ) ;
break ;
}
}
}
// Intentionally leave `tlogs.size() - can_omit` .. `tlogs.size()` as !isValid() Futures.
2017-10-06 08:09:44 +08:00
} */
2017-08-29 04:47:35 +08:00
2017-05-26 04:48:44 +08:00
template < class T >
static vector < T > getReadyNonError ( vector < Future < T > > const & futures ) {
// Return the values of those futures which have (non-error) values ready
std : : vector < T > result ;
for ( auto & f : futures )
if ( f . isReady ( ) & & ! f . isError ( ) )
result . push_back ( f . get ( ) ) ;
return result ;
}
struct sort_by_end {
bool operator ( ) ( TLogLockResult const & a , TLogLockResult const & b ) const { return a . end < b . end ; }
} ;
} ;
Future < Void > ILogSystem : : recoverAndEndEpoch ( Reference < AsyncVar < Reference < ILogSystem > > > const & outLogSystem , UID const & dbgid , DBCoreState const & oldState , FutureStream < TLogRejoinRequest > const & rejoins , LocalityData const & locality ) {
return TagPartitionedLogSystem : : recoverAndEndEpoch ( outLogSystem , dbgid , oldState , rejoins , locality ) ;
}
2018-02-22 06:06:44 +08:00
Reference < ILogSystem > ILogSystem : : fromLogSystemConfig ( UID const & dbgid , struct LocalityData const & locality , struct LogSystemConfig const & conf , bool excludeRemote ) {
2017-05-26 04:48:44 +08:00
if ( conf . logSystemType = = 0 )
return Reference < ILogSystem > ( ) ;
else if ( conf . logSystemType = = 2 )
2018-02-22 06:06:44 +08:00
return TagPartitionedLogSystem : : fromLogSystemConfig ( dbgid , locality , conf , excludeRemote ) ;
2017-05-26 04:48:44 +08:00
else
throw internal_error ( ) ;
}
Reference < ILogSystem > ILogSystem : : fromOldLogSystemConfig ( UID const & dbgid , struct LocalityData const & locality , struct LogSystemConfig const & conf ) {
if ( conf . logSystemType = = 0 )
return Reference < ILogSystem > ( ) ;
else if ( conf . logSystemType = = 2 )
return TagPartitionedLogSystem : : fromOldLogSystemConfig ( dbgid , locality , conf ) ;
else
throw internal_error ( ) ;
}
Reference < ILogSystem > ILogSystem : : fromServerDBInfo ( UID const & dbgid , ServerDBInfo const & dbInfo ) {
return fromLogSystemConfig ( dbgid , dbInfo . myLocality , dbInfo . logSystemConfig ) ;
}