2017-05-26 04:48:44 +08:00
/*
* masterserver . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2020-08-06 03:20:52 +08:00
# include <iterator>
2019-02-18 07:41:16 +08:00
# include "fdbclient/NativeAPI.actor.h"
2017-07-15 06:49:30 +08:00
# include "fdbclient/Notified.h"
2017-05-26 04:48:44 +08:00
# include "fdbclient/SystemData.h"
2020-08-06 03:20:52 +08:00
# include "fdbrpc/FailureMonitor.h"
# include "fdbrpc/PerfMetric.h"
# include "fdbrpc/sim_validation.h"
# include "fdbserver/ApplyMetadataMutation.h"
2019-10-01 04:16:28 +08:00
# include "fdbserver/BackupProgress.actor.h"
2020-08-06 03:20:52 +08:00
# include "fdbserver/ConflictSet.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/CoordinatedState.h"
2017-05-26 04:48:44 +08:00
# include "fdbserver/CoordinationInterface.h" // copy constructors for ServerCoordinators class
2018-10-20 01:30:13 +08:00
# include "fdbserver/DBCoreState.h"
2020-08-06 03:20:52 +08:00
# include "fdbserver/DataDistribution.actor.h"
# include "fdbserver/IKeyValueStore.h"
# include "fdbserver/Knobs.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/LogSystem.h"
# include "fdbserver/LogSystemDiskQueueAdapter.h"
2020-08-06 03:20:52 +08:00
# include "fdbserver/MasterInterface.h"
# include "fdbserver/ProxyCommitData.actor.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/RecoveryState.h"
2020-08-06 03:20:52 +08:00
# include "fdbserver/ServerDBInfo.h"
# include "fdbserver/WaitFailure.h"
# include "fdbserver/WorkerInterface.actor.h"
# include "flow/ActorCollection.h"
# include "flow/Trace.h"
2018-08-11 06:18:24 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
using std : : vector ;
using std : : min ;
using std : : max ;
2020-09-16 13:29:49 +08:00
struct CommitProxyVersionReplies {
2017-05-26 04:48:44 +08:00
std : : map < uint64_t , GetCommitVersionReply > replies ;
NotifiedVersion latestRequestNum ;
2020-09-16 13:29:49 +08:00
CommitProxyVersionReplies ( CommitProxyVersionReplies & & r ) noexcept
2020-06-10 08:33:41 +08:00
: replies ( std : : move ( r . replies ) ) , latestRequestNum ( std : : move ( r . latestRequestNum ) ) { }
2020-09-16 13:29:49 +08:00
void operator = ( CommitProxyVersionReplies & & r ) noexcept {
2020-06-10 08:33:41 +08:00
replies = std : : move ( r . replies ) ;
latestRequestNum = std : : move ( r . latestRequestNum ) ;
}
2017-05-26 04:48:44 +08:00
2020-09-16 13:29:49 +08:00
CommitProxyVersionReplies ( ) : latestRequestNum ( 0 ) { }
2017-05-26 04:48:44 +08:00
} ;
2017-09-08 06:32:08 +08:00
ACTOR Future < Void > masterTerminateOnConflict ( UID dbgid , Promise < Void > fullyRecovered , Future < Void > onConflict , Future < Void > switchedState ) {
choose {
2018-08-11 04:57:10 +08:00
when ( wait ( onConflict ) ) {
2017-09-08 06:32:08 +08:00
if ( ! fullyRecovered . isSet ( ) ) {
TraceEvent ( " MasterTerminated " , dbgid ) . detail ( " Reason " , " Conflict " ) ;
TEST ( true ) ; // Coordinated state conflict, master dying
throw worker_removed ( ) ;
}
return Void ( ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( switchedState ) ) {
2017-09-08 06:32:08 +08:00
return Void ( ) ;
}
}
}
class ReusableCoordinatedState : NonCopyable {
public :
Promise < Void > fullyRecovered ;
DBCoreState prevDBState ;
DBCoreState myDBState ;
2017-10-20 06:36:32 +08:00
bool finalWriteStarted ;
Future < Void > previousWrite ;
2017-09-08 06:32:08 +08:00
2017-10-20 06:36:32 +08:00
ReusableCoordinatedState ( ServerCoordinators const & coordinators , PromiseStream < Future < Void > > const & addActor , UID const & dbgid ) : coordinators ( coordinators ) , cstate ( coordinators ) , addActor ( addActor ) , dbgid ( dbgid ) , finalWriteStarted ( false ) , previousWrite ( Void ( ) ) { }
2017-09-08 06:32:08 +08:00
Future < Void > read ( ) {
return _read ( this ) ;
}
Future < Void > write ( DBCoreState newState , bool finalWrite = false ) {
2017-10-20 06:36:32 +08:00
previousWrite = _write ( this , newState , finalWrite ) ;
return previousWrite ;
2017-09-08 06:32:08 +08:00
}
Future < Void > move ( ClusterConnectionString const & nc ) {
return cstate . move ( nc ) ;
}
private :
MovableCoordinatedState cstate ;
ServerCoordinators coordinators ;
PromiseStream < Future < Void > > addActor ;
Promise < Void > switchedState ;
UID dbgid ;
ACTOR Future < Void > _read ( ReusableCoordinatedState * self ) {
Value prevDBStateRaw = wait ( self - > cstate . read ( ) ) ;
2017-10-20 06:36:32 +08:00
Future < Void > onConflict = masterTerminateOnConflict ( self - > dbgid , self - > fullyRecovered , self - > cstate . onConflict ( ) , self - > switchedState . getFuture ( ) ) ;
if ( onConflict . isReady ( ) & & onConflict . isError ( ) ) {
throw onConflict . getError ( ) ;
}
self - > addActor . send ( onConflict ) ;
2017-09-08 06:32:08 +08:00
if ( prevDBStateRaw . size ( ) ) {
self - > prevDBState = BinaryReader : : fromStringRef < DBCoreState > ( prevDBStateRaw , IncludeVersion ( ) ) ;
self - > myDBState = self - > prevDBState ;
}
return Void ( ) ;
}
ACTOR Future < Void > _write ( ReusableCoordinatedState * self , DBCoreState newState , bool finalWrite ) {
2017-10-20 06:36:32 +08:00
if ( self - > finalWriteStarted ) {
2018-08-11 04:57:10 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
2017-10-20 06:36:32 +08:00
}
if ( finalWrite ) {
self - > finalWriteStarted = true ;
}
2019-08-11 01:31:25 +08:00
2017-09-08 06:32:08 +08:00
try {
2020-05-23 08:14:21 +08:00
wait ( self - > cstate . setExclusive ( BinaryWriter : : toValue ( newState , IncludeVersion ( ProtocolVersion : : withDBCoreState ( ) ) ) ) ) ;
2017-09-08 06:32:08 +08:00
} catch ( Error & e ) {
TEST ( true ) ; // Master displaced during writeMasterState
throw ;
}
self - > myDBState = newState ;
if ( ! finalWrite ) {
self - > switchedState . send ( Void ( ) ) ;
self - > cstate = MovableCoordinatedState ( self - > coordinators ) ;
Value rereadDBStateRaw = wait ( self - > cstate . read ( ) ) ;
DBCoreState readState ;
if ( rereadDBStateRaw . size ( ) )
readState = BinaryReader : : fromStringRef < DBCoreState > ( rereadDBStateRaw , IncludeVersion ( ) ) ;
if ( readState ! = newState ) {
TraceEvent ( " MasterTerminated " , self - > dbgid ) . detail ( " Reason " , " CStateChanged " ) ;
TEST ( true ) ; // Coordinated state changed between writing and reading, master dying
throw worker_removed ( ) ;
}
self - > switchedState = Promise < Void > ( ) ;
self - > addActor . send ( masterTerminateOnConflict ( self - > dbgid , self - > fullyRecovered , self - > cstate . onConflict ( ) , self - > switchedState . getFuture ( ) ) ) ;
} else {
self - > fullyRecovered . send ( Void ( ) ) ;
}
2020-01-25 03:03:25 +08:00
2017-09-08 06:32:08 +08:00
return Void ( ) ;
}
} ;
2017-05-26 04:48:44 +08:00
struct MasterData : NonCopyable , ReferenceCounted < MasterData > {
UID dbgid ;
AsyncTrigger registrationTrigger ;
Version lastEpochEnd , // The last version in the old epoch not (to be) rolled back in this recovery
recoveryTransactionVersion ; // The first version in this epoch
double lastCommitTime ;
2020-09-16 13:29:49 +08:00
Version liveCommittedVersion ; // The largest live committed version reported by commit proxies.
2020-06-12 05:07:37 +08:00
bool databaseLocked ;
2020-06-11 06:55:23 +08:00
Optional < Value > proxyMetadataVersion ;
2020-07-15 15:37:41 +08:00
Version minKnownCommittedVersion ;
2020-06-11 06:55:23 +08:00
2017-05-26 04:48:44 +08:00
DatabaseConfiguration originalConfiguration ;
DatabaseConfiguration configuration ;
2018-01-09 04:04:19 +08:00
std : : vector < Optional < Key > > primaryDcId ;
2018-02-27 09:09:09 +08:00
std : : vector < Optional < Key > > remoteDcIds ;
2017-09-23 07:59:24 +08:00
bool hasConfiguration ;
2017-05-26 04:48:44 +08:00
ServerCoordinators coordinators ;
Reference < ILogSystem > logSystem ;
Version version ; // The last version assigned to a proxy by getVersion()
double lastVersionTime ;
LogSystemDiskQueueAdapter * txnStateLogAdapter ;
IKeyValueStore * txnStateStore ;
int64_t memoryLimit ;
2017-09-08 06:32:08 +08:00
std : : map < Optional < Value > , int8_t > dcId_locality ;
2018-04-23 11:28:01 +08:00
std : : vector < Tag > allTags ;
2017-05-26 04:48:44 +08:00
2018-01-17 10:12:40 +08:00
int8_t getNextLocality ( ) {
int8_t maxLocality = - 1 ;
for ( auto it : dcId_locality ) {
maxLocality = std : : max ( maxLocality , it . second ) ;
}
return maxLocality + 1 ;
}
2020-09-11 08:44:15 +08:00
std : : vector < CommitProxyInterface > commitProxies ;
std : : vector < CommitProxyInterface > provisionalCommitProxies ;
2020-07-15 15:37:41 +08:00
std : : vector < GrvProxyInterface > grvProxies ;
std : : vector < GrvProxyInterface > provisionalGrvProxies ;
2019-05-15 00:55:39 +08:00
std : : vector < ResolverInterface > resolvers ;
2017-05-26 04:48:44 +08:00
2020-09-16 13:29:49 +08:00
std : : map < UID , CommitProxyVersionReplies > lastCommitProxyVersionReplies ;
2017-05-26 04:48:44 +08:00
Standalone < StringRef > dbId ;
MasterInterface myInterface ;
2019-08-02 05:38:05 +08:00
const ClusterControllerFullInterface clusterController ; // If the cluster controller changes, this master will die, so this is immutable.
2017-05-26 04:48:44 +08:00
2017-09-08 06:32:08 +08:00
ReusableCoordinatedState cstate ;
2018-04-11 02:14:57 +08:00
Promise < Void > cstateUpdated ;
2017-05-26 04:48:44 +08:00
Reference < AsyncVar < ServerDBInfo > > dbInfo ;
int64_t registrationCount ; // Number of different MasterRegistrationRequests sent to clusterController
2018-05-09 08:17:17 +08:00
RecoveryState recoveryState ;
2017-05-26 04:48:44 +08:00
AsyncVar < Standalone < VectorRef < ResolverMoveRef > > > resolverChanges ;
Version resolverChangesVersion ;
std : : set < UID > resolverNeedingChanges ;
2017-09-08 06:32:08 +08:00
PromiseStream < Future < Void > > addActor ;
2018-06-14 09:14:14 +08:00
Reference < AsyncVar < bool > > recruitmentStalled ;
2018-07-01 21:39:04 +08:00
bool forceRecovery ;
2019-05-15 00:55:39 +08:00
bool neverCreated ;
2019-02-19 06:30:51 +08:00
int8_t safeLocality ;
2019-02-19 06:40:30 +08:00
int8_t primaryLocality ;
2017-05-26 04:48:44 +08:00
2019-07-24 02:45:04 +08:00
std : : vector < WorkerInterface > backupWorkers ; // Recruited backup workers from cluster controller.
2017-05-26 04:48:44 +08:00
MasterData (
Reference < AsyncVar < ServerDBInfo > > const & dbInfo ,
MasterInterface const & myInterface ,
ServerCoordinators const & coordinators ,
ClusterControllerFullInterface const & clusterController ,
2017-09-08 06:32:08 +08:00
Standalone < StringRef > const & dbId ,
2018-07-01 21:39:04 +08:00
PromiseStream < Future < Void > > const & addActor ,
bool forceRecovery
2017-05-26 04:48:44 +08:00
)
2018-02-10 08:48:55 +08:00
: dbgid ( myInterface . id ( ) ) ,
2017-05-26 04:48:44 +08:00
myInterface ( myInterface ) ,
dbInfo ( dbInfo ) ,
2017-09-08 06:32:08 +08:00
cstate ( coordinators , addActor , dbgid ) ,
2017-05-26 04:48:44 +08:00
coordinators ( coordinators ) ,
clusterController ( clusterController ) ,
2018-02-10 08:48:55 +08:00
dbId ( dbId ) ,
2018-07-01 21:39:04 +08:00
forceRecovery ( forceRecovery ) ,
2019-02-19 06:30:51 +08:00
safeLocality ( tagLocalityInvalid ) ,
2019-02-19 06:40:30 +08:00
primaryLocality ( tagLocalityInvalid ) ,
2018-11-05 15:07:56 +08:00
neverCreated ( false ) ,
2017-05-26 04:48:44 +08:00
lastEpochEnd ( invalidVersion ) ,
2020-06-10 02:09:46 +08:00
liveCommittedVersion ( invalidVersion ) ,
2020-06-12 05:07:37 +08:00
databaseLocked ( false ) ,
2020-07-23 14:35:46 +08:00
minKnownCommittedVersion ( invalidVersion ) ,
2017-05-26 04:48:44 +08:00
recoveryTransactionVersion ( invalidVersion ) ,
lastCommitTime ( 0 ) ,
registrationCount ( 0 ) ,
version ( invalidVersion ) ,
lastVersionTime ( 0 ) ,
txnStateStore ( 0 ) ,
2017-09-08 06:32:08 +08:00
memoryLimit ( 2e9 ) ,
2017-10-06 08:09:44 +08:00
addActor ( addActor ) ,
2018-06-14 09:14:14 +08:00
hasConfiguration ( false ) ,
recruitmentStalled ( Reference < AsyncVar < bool > > ( new AsyncVar < bool > ( ) ) )
2017-05-26 04:48:44 +08:00
{
2019-02-19 06:54:28 +08:00
if ( forceRecovery & & ! myInterface . locality . dcId ( ) . present ( ) ) {
TraceEvent ( SevError , " ForcedRecoveryRequiresDcID " ) ;
forceRecovery = false ;
}
2017-05-26 04:48:44 +08:00
}
~ MasterData ( ) { if ( txnStateStore ) txnStateStore - > close ( ) ; }
} ;
2020-09-11 08:44:15 +08:00
ACTOR Future < Void > newCommitProxies ( Reference < MasterData > self , RecruitFromConfigurationReply recr ) {
vector < Future < CommitProxyInterface > > initializationReplies ;
for ( int i = 0 ; i < recr . commitProxies . size ( ) ; i + + ) {
InitializeCommitProxyRequest req ;
2017-05-26 04:48:44 +08:00
req . master = self - > myInterface ;
2017-09-08 06:32:08 +08:00
req . recoveryCount = self - > cstate . myDBState . recoveryCount + 1 ;
2017-05-26 04:48:44 +08:00
req . recoveryTransactionVersion = self - > recoveryTransactionVersion ;
req . firstProxy = i = = 0 ;
2020-09-11 08:44:15 +08:00
TraceEvent ( " CommitProxyReplies " , self - > dbgid ) . detail ( " WorkerID " , recr . commitProxies [ i ] . id ( ) ) ;
initializationReplies . push_back (
transformErrors ( throwErrorOr ( recr . commitProxies [ i ] . commitProxy . getReplyUnlessFailedFor (
req , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) ,
master_recovery_failed ( ) ) ) ;
2017-05-26 04:48:44 +08:00
}
2020-09-11 08:44:15 +08:00
vector < CommitProxyInterface > newRecruits = wait ( getAll ( initializationReplies ) ) ;
2020-09-16 13:29:49 +08:00
// It is required for the correctness of COMMIT_ON_FIRST_PROXY that self->commitProxies[0] is the firstCommitProxy.
2020-09-11 08:44:15 +08:00
self - > commitProxies = newRecruits ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
2020-07-15 15:37:41 +08:00
ACTOR Future < Void > newGrvProxies ( Reference < MasterData > self , RecruitFromConfigurationReply recr ) {
vector < Future < GrvProxyInterface > > initializationReplies ;
for ( int i = 0 ; i < recr . grvProxies . size ( ) ; i + + ) {
InitializeGrvProxyRequest req ;
req . master = self - > myInterface ;
req . recoveryCount = self - > cstate . myDBState . recoveryCount + 1 ;
TraceEvent ( " GrvProxyReplies " , self - > dbgid ) . detail ( " WorkerID " , recr . grvProxies [ i ] . id ( ) ) ;
initializationReplies . push_back ( transformErrors ( throwErrorOr ( recr . grvProxies [ i ] . grvProxy . getReplyUnlessFailedFor ( req , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
}
vector < GrvProxyInterface > newRecruits = wait ( getAll ( initializationReplies ) ) ;
self - > grvProxies = newRecruits ;
return Void ( ) ;
}
2018-01-09 04:04:19 +08:00
ACTOR Future < Void > newResolvers ( Reference < MasterData > self , RecruitFromConfigurationReply recr ) {
vector < Future < ResolverInterface > > initializationReplies ;
for ( int i = 0 ; i < recr . resolvers . size ( ) ; i + + ) {
2017-05-26 04:48:44 +08:00
InitializeResolverRequest req ;
2017-09-08 06:32:08 +08:00
req . recoveryCount = self - > cstate . myDBState . recoveryCount + 1 ;
2020-09-11 08:44:15 +08:00
req . commitProxyCount = recr . commitProxies . size ( ) ;
2017-05-26 04:48:44 +08:00
req . resolverCount = recr . resolvers . size ( ) ;
2018-06-09 02:11:08 +08:00
TraceEvent ( " ResolverReplies " , self - > dbgid ) . detail ( " WorkerID " , recr . resolvers [ i ] . id ( ) ) ;
2018-01-09 04:04:19 +08:00
initializationReplies . push_back ( transformErrors ( throwErrorOr ( recr . resolvers [ i ] . resolver . getReplyUnlessFailedFor ( req , SERVER_KNOBS - > TLOG_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) , master_recovery_failed ( ) ) ) ;
2017-05-26 04:48:44 +08:00
}
vector < ResolverInterface > newRecruits = wait ( getAll ( initializationReplies ) ) ;
self - > resolvers = newRecruits ;
return Void ( ) ;
}
2018-01-17 10:12:40 +08:00
ACTOR Future < Void > newTLogServers ( Reference < MasterData > self , RecruitFromConfigurationReply recr , Reference < ILogSystem > oldLogSystem , vector < Standalone < CommitTransactionRef > > * initialConfChanges ) {
2018-06-18 10:31:15 +08:00
if ( self - > configuration . usableRegions > 1 ) {
2018-03-06 11:27:46 +08:00
state Optional < Key > remoteDcId = self - > remoteDcIds . size ( ) ? self - > remoteDcIds [ 0 ] : Optional < Key > ( ) ;
if ( ! self - > dcId_locality . count ( recr . dcId ) ) {
2018-01-18 09:03:17 +08:00
int8_t loc = self - > getNextLocality ( ) ;
Standalone < CommitTransactionRef > tr ;
2018-03-06 11:27:46 +08:00
tr . set ( tr . arena ( ) , tagLocalityListKeyFor ( recr . dcId ) , tagLocalityListValue ( loc ) ) ;
2018-01-18 09:03:17 +08:00
initialConfChanges - > push_back ( tr ) ;
2018-03-06 11:27:46 +08:00
self - > dcId_locality [ recr . dcId ] = loc ;
2019-03-19 06:03:43 +08:00
TraceEvent ( SevWarn , " UnknownPrimaryDCID " , self - > dbgid ) . detail ( " PrimaryId " , recr . dcId ) . detail ( " Loc " , loc ) ;
2018-01-18 09:03:17 +08:00
}
2017-09-08 06:32:08 +08:00
2018-03-06 11:27:46 +08:00
if ( ! self - > dcId_locality . count ( remoteDcId ) ) {
2018-01-18 09:03:17 +08:00
int8_t loc = self - > getNextLocality ( ) ;
Standalone < CommitTransactionRef > tr ;
2018-03-06 11:27:46 +08:00
tr . set ( tr . arena ( ) , tagLocalityListKeyFor ( remoteDcId ) , tagLocalityListValue ( loc ) ) ;
2018-01-18 09:03:17 +08:00
initialConfChanges - > push_back ( tr ) ;
2018-03-06 11:27:46 +08:00
self - > dcId_locality [ remoteDcId ] = loc ;
2019-03-19 06:03:43 +08:00
TraceEvent ( SevWarn , " UnknownRemoteDCID " , self - > dbgid ) . detail ( " RemoteId " , remoteDcId ) . detail ( " Loc " , loc ) ;
2018-01-18 09:03:17 +08:00
}
2017-05-26 04:48:44 +08:00
2019-03-19 03:17:59 +08:00
std : : vector < UID > exclusionWorkerIds ;
std : : transform ( recr . tLogs . begin ( ) , recr . tLogs . end ( ) , std : : back_inserter ( exclusionWorkerIds ) , [ ] ( const WorkerInterface & in ) { return in . id ( ) ; } ) ;
std : : transform ( recr . satelliteTLogs . begin ( ) , recr . satelliteTLogs . end ( ) , std : : back_inserter ( exclusionWorkerIds ) , [ ] ( const WorkerInterface & in ) { return in . id ( ) ; } ) ;
Future < RecruitRemoteFromConfigurationReply > fRemoteWorkers = brokenPromiseToNever ( self - > clusterController . recruitRemoteFromConfiguration . getReply ( RecruitRemoteFromConfigurationRequest ( self - > configuration , remoteDcId , recr . tLogs . size ( ) * std : : max < int > ( 1 , self - > configuration . desiredLogRouterCount / std : : max < int > ( 1 , recr . tLogs . size ( ) ) ) , exclusionWorkerIds ) ) ) ;
2017-05-26 04:48:44 +08:00
2019-02-19 06:40:30 +08:00
self - > primaryLocality = self - > dcId_locality [ recr . dcId ] ;
2019-04-30 07:44:11 +08:00
self - > logSystem = Reference < ILogSystem > ( ) ; // Cancels the actors in the previous log system.
2019-02-19 06:40:30 +08:00
Reference < ILogSystem > newLogSystem = wait ( oldLogSystem - > newEpoch ( recr , fRemoteWorkers , self - > configuration , self - > cstate . myDBState . recoveryCount + 1 , self - > primaryLocality , self - > dcId_locality [ remoteDcId ] , self - > allTags , self - > recruitmentStalled ) ) ;
2018-01-18 09:03:17 +08:00
self - > logSystem = newLogSystem ;
} else {
2019-02-19 06:40:30 +08:00
self - > primaryLocality = tagLocalitySpecial ;
2019-04-30 07:44:11 +08:00
self - > logSystem = Reference < ILogSystem > ( ) ; // Cancels the actors in the previous log system.
2019-02-19 06:40:30 +08:00
Reference < ILogSystem > newLogSystem = wait ( oldLogSystem - > newEpoch ( recr , Never ( ) , self - > configuration , self - > cstate . myDBState . recoveryCount + 1 , self - > primaryLocality , tagLocalitySpecial , self - > allTags , self - > recruitmentStalled ) ) ;
2018-01-18 09:03:17 +08:00
self - > logSystem = newLogSystem ;
}
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
2017-09-15 08:06:00 +08:00
ACTOR Future < Void > newSeedServers ( Reference < MasterData > self , RecruitFromConfigurationReply recruits , vector < StorageServerInterface > * servers ) {
2017-05-26 04:48:44 +08:00
// This is only necessary if the database is at version 0
servers - > clear ( ) ;
if ( self - > lastEpochEnd ) return Void ( ) ;
2017-10-06 08:09:44 +08:00
state int idx = 0 ;
state std : : map < Optional < Value > , Tag > dcId_tags ;
2017-08-04 07:16:36 +08:00
state int8_t nextLocality = 0 ;
2017-10-06 08:09:44 +08:00
while ( idx < recruits . storageServers . size ( ) ) {
2017-09-15 08:06:00 +08:00
TraceEvent ( " MasterRecruitingInitialStorageServer " , self - > dbgid )
2017-10-06 08:09:44 +08:00
. detail ( " CandidateWorker " , recruits . storageServers [ idx ] . locality . toString ( ) ) ;
2017-05-26 04:48:44 +08:00
2017-09-15 08:06:00 +08:00
InitializeStorageRequest isr ;
2017-10-06 08:09:44 +08:00
isr . seedTag = dcId_tags . count ( recruits . storageServers [ idx ] . locality . dcId ( ) ) ? dcId_tags [ recruits . storageServers [ idx ] . locality . dcId ( ) ] : Tag ( nextLocality , 0 ) ;
2017-09-15 08:06:00 +08:00
isr . storeType = self - > configuration . storageServerStoreType ;
2019-05-11 05:01:52 +08:00
isr . reqId = deterministicRandom ( ) - > randomUniqueID ( ) ;
isr . interfaceId = deterministicRandom ( ) - > randomUniqueID ( ) ;
2017-05-26 04:48:44 +08:00
2018-05-06 09:16:28 +08:00
ErrorOr < InitializeStorageReply > newServer = wait ( recruits . storageServers [ idx ] . storage . tryGetReply ( isr ) ) ;
2017-05-26 04:48:44 +08:00
2017-09-15 08:06:00 +08:00
if ( newServer . isError ( ) ) {
if ( ! newServer . isError ( error_code_recruitment_failed ) & & ! newServer . isError ( error_code_request_maybe_delivered ) )
throw newServer . getError ( ) ;
2017-05-26 04:48:44 +08:00
2017-09-15 08:06:00 +08:00
TEST ( true ) ; // masterserver initial storage recuitment loop failed to get new server
2018-08-11 04:57:10 +08:00
wait ( delay ( SERVER_KNOBS - > STORAGE_RECRUITMENT_DELAY ) ) ;
2017-09-15 08:06:00 +08:00
}
else {
2017-10-06 08:09:44 +08:00
if ( ! dcId_tags . count ( recruits . storageServers [ idx ] . locality . dcId ( ) ) ) {
dcId_tags [ recruits . storageServers [ idx ] . locality . dcId ( ) ] = Tag ( nextLocality , 0 ) ;
nextLocality + + ;
2017-05-26 04:48:44 +08:00
}
2020-01-25 03:03:25 +08:00
2017-10-06 08:09:44 +08:00
Tag & tag = dcId_tags [ recruits . storageServers [ idx ] . locality . dcId ( ) ] ;
tag . id + + ;
idx + + ;
2018-05-06 09:16:28 +08:00
servers - > push_back ( newServer . get ( ) . interf ) ;
2017-05-26 04:48:44 +08:00
}
}
2017-09-08 06:32:08 +08:00
self - > dcId_locality . clear ( ) ;
for ( auto & it : dcId_tags ) {
self - > dcId_locality [ it . first ] = it . second . locality ;
}
2017-05-26 04:48:44 +08:00
TraceEvent ( " MasterRecruitedInitialStorageServers " , self - > dbgid )
. detail ( " TargetCount " , self - > configuration . storageTeamSize )
. detail ( " Servers " , describe ( * servers ) ) ;
return Void ( ) ;
}
2020-09-11 08:44:15 +08:00
Future < Void > waitCommitProxyFailure ( vector < CommitProxyInterface > const & commitProxies ) {
2020-07-11 06:53:21 +08:00
std : : vector < Future < Void > > failed ;
2020-09-11 08:44:15 +08:00
for ( auto commitProxy : commitProxies ) {
failed . push_back ( waitFailureClient ( commitProxy . waitFailure , SERVER_KNOBS - > TLOG_TIMEOUT ,
2020-07-11 06:53:21 +08:00
- SERVER_KNOBS - > TLOG_TIMEOUT / SERVER_KNOBS - > SECONDS_BEFORE_NO_FAILURE_DELAY ,
/*trace=*/ true ) ) ;
}
2017-05-26 04:48:44 +08:00
ASSERT ( failed . size ( ) > = 1 ) ;
2020-09-11 08:44:15 +08:00
return tagError < Void > ( quorum ( failed , 1 ) , commit_proxy_failed ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2020-07-15 15:37:41 +08:00
Future < Void > waitGrvProxyFailure ( vector < GrvProxyInterface > const & grvProxies ) {
vector < Future < Void > > failed ;
for ( int i = 0 ; i < grvProxies . size ( ) ; i + + )
2020-08-13 01:34:07 +08:00
failed . push_back ( waitFailureClient ( grvProxies [ i ] . waitFailure , SERVER_KNOBS - > TLOG_TIMEOUT ,
- SERVER_KNOBS - > TLOG_TIMEOUT / SERVER_KNOBS - > SECONDS_BEFORE_NO_FAILURE_DELAY ,
/*trace=*/ true ) ) ;
2020-07-15 15:37:41 +08:00
ASSERT ( failed . size ( ) > = 1 ) ;
return tagError < Void > ( quorum ( failed , 1 ) , grv_proxy_failed ( ) ) ;
}
2017-05-26 04:48:44 +08:00
Future < Void > waitResolverFailure ( vector < ResolverInterface > const & resolvers ) {
2020-07-11 06:53:21 +08:00
std : : vector < Future < Void > > failed ;
for ( auto resolver : resolvers ) {
failed . push_back ( waitFailureClient ( resolver . waitFailure , SERVER_KNOBS - > TLOG_TIMEOUT ,
- SERVER_KNOBS - > TLOG_TIMEOUT / SERVER_KNOBS - > SECONDS_BEFORE_NO_FAILURE_DELAY ,
/*trace=*/ true ) ) ;
}
2017-05-26 04:48:44 +08:00
ASSERT ( failed . size ( ) > = 1 ) ;
return tagError < Void > ( quorum ( failed , 1 ) , master_resolver_failed ( ) ) ;
}
ACTOR Future < Void > updateLogsValue ( Reference < MasterData > self , Database cx ) {
state Transaction tr ( cx ) ;
loop {
try {
Optional < Standalone < StringRef > > value = wait ( tr . get ( logsKey ) ) ;
ASSERT ( value . present ( ) ) ;
auto logs = decodeLogsValue ( value . get ( ) ) ;
2018-06-15 03:54:39 +08:00
std : : set < UID > logIds ;
for ( auto & log : logs . first ) {
logIds . insert ( log . first ) ;
2017-07-11 08:41:32 +08:00
}
2017-05-26 04:48:44 +08:00
2018-06-15 03:54:39 +08:00
bool found = false ;
for ( auto & logSet : self - > logSystem - > getLogSystemConfig ( ) . tLogs ) {
for ( auto & log : logSet . tLogs ) {
if ( logIds . count ( log . id ( ) ) ) {
found = true ;
2017-05-26 04:48:44 +08:00
break ;
}
}
2018-06-15 03:54:39 +08:00
if ( found ) {
break ;
}
2017-05-26 04:48:44 +08:00
}
2018-06-15 03:54:39 +08:00
if ( ! found ) {
2017-05-26 04:48:44 +08:00
TEST ( true ) ; //old master attempted to change logsKey
return Void ( ) ;
}
tr . set ( logsKey , self - > logSystem - > getLogsValue ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( tr . commit ( ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
} catch ( Error & e ) {
2018-08-11 04:57:10 +08:00
wait ( tr . onError ( e ) ) ;
2017-05-26 04:48:44 +08:00
}
}
}
2020-07-31 05:45:03 +08:00
Future < Void > sendMasterRegistration ( MasterData * self , LogSystemConfig const & logSystemConfig ,
2020-09-16 13:29:49 +08:00
vector < CommitProxyInterface > commitProxies , vector < GrvProxyInterface > grvProxies ,
2020-07-31 05:45:03 +08:00
vector < ResolverInterface > resolvers , DBRecoveryCount recoveryCount ,
vector < UID > priorCommittedLogServers ) {
2017-05-26 04:48:44 +08:00
RegisterMasterRequest masterReq ;
masterReq . id = self - > myInterface . id ( ) ;
masterReq . mi = self - > myInterface . locality ;
masterReq . logSystemConfig = logSystemConfig ;
2020-09-16 13:29:49 +08:00
masterReq . commitProxies = commitProxies ;
2020-07-15 15:37:41 +08:00
masterReq . grvProxies = grvProxies ;
2017-05-26 04:48:44 +08:00
masterReq . resolvers = resolvers ;
masterReq . recoveryCount = recoveryCount ;
2017-09-23 07:59:24 +08:00
if ( self - > hasConfiguration ) masterReq . configuration = self - > configuration ;
2017-05-26 04:48:44 +08:00
masterReq . registrationCount = + + self - > registrationCount ;
masterReq . priorCommittedLogServers = priorCommittedLogServers ;
masterReq . recoveryState = self - > recoveryState ;
2018-06-14 09:14:14 +08:00
masterReq . recoveryStalled = self - > recruitmentStalled - > get ( ) ;
2017-05-26 04:48:44 +08:00
return brokenPromiseToNever ( self - > clusterController . registerMaster . getReply ( masterReq ) ) ;
}
ACTOR Future < Void > updateRegistration ( Reference < MasterData > self , Reference < ILogSystem > logSystem ) {
2019-06-25 17:47:35 +08:00
state Database cx = openDBOnServer ( self - > dbInfo , TaskPriority : : DefaultEndpoint , true , true ) ;
2020-01-17 13:21:25 +08:00
state Future < Void > trigger = self - > registrationTrigger . onTrigger ( ) ;
2017-05-26 04:48:44 +08:00
state Future < Void > updateLogsKey ;
loop {
2018-08-11 04:57:10 +08:00
wait ( trigger ) ;
wait ( delay ( .001 ) ) ; // Coalesce multiple changes
2017-05-26 04:48:44 +08:00
2019-08-13 04:15:15 +08:00
trigger = self - > registrationTrigger . onTrigger ( ) ;
2017-05-26 04:48:44 +08:00
2020-03-20 01:08:19 +08:00
auto logSystemConfig = logSystem - > getLogSystemConfig ( ) ;
TraceEvent ( " MasterUpdateRegistration " , self - > dbgid )
. detail ( " RecoveryCount " , self - > cstate . myDBState . recoveryCount )
. detail ( " OldestBackupEpoch " , logSystemConfig . oldestBackupEpoch )
. detail ( " Logs " , describe ( logSystemConfig . tLogs ) ) ;
2017-05-26 04:48:44 +08:00
2018-04-11 02:14:57 +08:00
if ( ! self - > cstateUpdated . isSet ( ) ) {
2020-09-11 08:44:15 +08:00
wait ( sendMasterRegistration ( self . getPtr ( ) , logSystemConfig , self - > provisionalCommitProxies ,
2020-07-15 15:37:41 +08:00
self - > provisionalGrvProxies , self - > resolvers ,
2020-03-20 01:08:19 +08:00
self - > cstate . myDBState . recoveryCount ,
self - > cstate . prevDBState . getPriorCommittedLogServers ( ) ) ) ;
2017-05-26 04:48:44 +08:00
} else {
updateLogsKey = updateLogsValue ( self , cx ) ;
2020-09-11 08:44:15 +08:00
wait ( sendMasterRegistration ( self . getPtr ( ) , logSystemConfig , self - > commitProxies , self - > grvProxies ,
self - > resolvers , self - > cstate . myDBState . recoveryCount , vector < UID > ( ) ) ) ;
2017-05-26 04:48:44 +08:00
}
}
}
ACTOR Future < Standalone < CommitTransactionRef > > provisionalMaster ( Reference < MasterData > parent , Future < Void > activate ) {
2018-08-11 04:57:10 +08:00
wait ( activate ) ;
2017-05-26 04:48:44 +08:00
2020-09-11 08:44:15 +08:00
// Register a fake commit proxy (to be provided right here) to make ourselves available to clients
parent - > provisionalCommitProxies = vector < CommitProxyInterface > ( 1 ) ;
parent - > provisionalCommitProxies [ 0 ] . provisional = true ;
parent - > provisionalCommitProxies [ 0 ] . initEndpoints ( ) ;
2020-07-15 15:37:41 +08:00
parent - > provisionalGrvProxies = vector < GrvProxyInterface > ( 1 ) ;
parent - > provisionalGrvProxies [ 0 ] . provisional = true ;
parent - > provisionalGrvProxies [ 0 ] . initEndpoints ( ) ;
2020-09-11 08:44:15 +08:00
state Future < Void > waitCommitProxyFailure =
waitFailureServer ( parent - > provisionalCommitProxies [ 0 ] . waitFailure . getFuture ( ) ) ;
2020-07-15 15:37:41 +08:00
state Future < Void > waitGrvProxyFailure = waitFailureServer ( parent - > provisionalGrvProxies [ 0 ] . waitFailure . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
parent - > registrationTrigger . trigger ( ) ;
auto lockedKey = parent - > txnStateStore - > readValue ( databaseLockedKey ) . get ( ) ;
state bool locked = lockedKey . present ( ) & & lockedKey . get ( ) . size ( ) ;
2019-03-01 09:45:00 +08:00
state Optional < Value > metadataVersion = parent - > txnStateStore - > readValue ( metadataVersionKey ) . get ( ) ;
2020-09-11 08:44:15 +08:00
// We respond to a minimal subset of the commit proxy protocol. Our sole purpose is to receive a single write-only
// transaction which might repair our configuration, and return it.
2017-05-26 04:48:44 +08:00
loop choose {
2020-07-15 15:37:41 +08:00
when ( GetReadVersionRequest req = waitNext ( parent - > provisionalGrvProxies [ 0 ] . getConsistentReadVersion . getFuture ( ) ) ) {
2017-05-26 04:48:44 +08:00
if ( req . flags & GetReadVersionRequest : : FLAG_CAUSAL_READ_RISKY & & parent - > lastEpochEnd ) {
GetReadVersionReply rep ;
rep . version = parent - > lastEpochEnd ;
rep . locked = locked ;
2019-03-01 09:45:00 +08:00
rep . metadataVersion = metadataVersion ;
2017-05-26 04:48:44 +08:00
req . reply . send ( rep ) ;
} else
req . reply . send ( Never ( ) ) ; // We can't perform causally consistent reads without recovering
}
2020-09-11 08:44:15 +08:00
when ( CommitTransactionRequest req = waitNext ( parent - > provisionalCommitProxies [ 0 ] . commit . getFuture ( ) ) ) {
2017-05-26 04:48:44 +08:00
req . reply . send ( Never ( ) ) ; // don't reply (clients always get commit_unknown_result)
auto t = & req . transaction ;
if ( t - > read_snapshot = = parent - > lastEpochEnd & & //< So no transactions can fall between the read snapshot and the recovery transaction this (might) be merged with
// vvv and also the changes we will make in the recovery transaction (most notably to lastEpochEndKey) BEFORE we merge initialConfChanges won't conflict
! std : : any_of ( t - > read_conflict_ranges . begin ( ) , t - > read_conflict_ranges . end ( ) , [ ] ( KeyRangeRef const & r ) { return r . contains ( lastEpochEndKey ) ; } ) )
{
for ( auto m = t - > mutations . begin ( ) ; m ! = t - > mutations . end ( ) ; + + m ) {
2019-03-19 06:03:43 +08:00
TraceEvent ( " PM_CTM " , parent - > dbgid ) . detail ( " MType " , m - > type ) . detail ( " Param1 " , m - > param1 ) . detail ( " Param2 " , m - > param2 ) ;
2017-05-26 04:48:44 +08:00
if ( isMetadataMutation ( * m ) ) {
// We keep the mutations and write conflict ranges from this transaction, but not its read conflict ranges
Standalone < CommitTransactionRef > out ;
out . read_snapshot = invalidVersion ;
out . mutations . append_deep ( out . arena ( ) , t - > mutations . begin ( ) , t - > mutations . size ( ) ) ;
out . write_conflict_ranges . append_deep ( out . arena ( ) , t - > write_conflict_ranges . begin ( ) , t - > write_conflict_ranges . size ( ) ) ;
return out ;
}
}
}
}
2020-09-11 08:44:15 +08:00
when ( GetKeyServerLocationsRequest req =
waitNext ( parent - > provisionalCommitProxies [ 0 ] . getKeyServersLocations . getFuture ( ) ) ) {
2017-12-10 08:10:22 +08:00
req . reply . send ( Never ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2020-09-11 08:44:15 +08:00
when ( wait ( waitCommitProxyFailure ) ) { throw worker_removed ( ) ; }
2020-07-15 15:37:41 +08:00
when ( wait ( waitGrvProxyFailure ) ) { throw worker_removed ( ) ; }
2017-05-26 04:48:44 +08:00
}
}
2018-11-06 03:12:43 +08:00
ACTOR Future < vector < Standalone < CommitTransactionRef > > > recruitEverything ( Reference < MasterData > self , vector < StorageServerInterface > * seedServers , Reference < ILogSystem > oldLogSystem ) {
2017-05-26 04:48:44 +08:00
if ( ! self - > configuration . isValid ( ) ) {
RecoveryStatus : : RecoveryStatus status ;
2018-11-05 15:07:56 +08:00
if ( self - > configuration . initialized ) {
2019-09-06 06:36:37 +08:00
TraceEvent ( SevWarn , " MasterRecoveryInvalidConfiguration " , self - > dbgid )
. setMaxEventLength ( 11000 )
. setMaxFieldLength ( 10000 )
. detail ( " Conf " , self - > configuration . toString ( ) ) ;
2017-05-26 04:48:44 +08:00
status = RecoveryStatus : : configuration_invalid ;
2018-11-05 15:07:56 +08:00
} else if ( ! self - > cstate . prevDBState . tLogs . size ( ) ) {
2017-05-26 04:48:44 +08:00
status = RecoveryStatus : : configuration_never_created ;
2018-11-05 15:07:56 +08:00
self - > neverCreated = true ;
} else {
2017-05-26 04:48:44 +08:00
status = RecoveryStatus : : configuration_missing ;
2018-11-05 15:07:56 +08:00
}
2017-05-26 04:48:44 +08:00
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
. detail ( " StatusCode " , status )
. detail ( " Status " , RecoveryStatus : : names [ status ] )
2017-10-25 07:28:50 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2017-05-26 04:48:44 +08:00
return Never ( ) ;
} else
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
2020-08-06 15:01:57 +08:00
. detail ( " StatusCode " , RecoveryStatus : : recruiting_transaction_servers )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : recruiting_transaction_servers ] )
. detail ( " RequiredTLogs " , self - > configuration . tLogReplicationFactor )
. detail ( " DesiredTLogs " , self - > configuration . getDesiredLogs ( ) )
2020-09-11 08:44:15 +08:00
. detail ( " RequiredCommitProxies " , 1 )
. detail ( " DesiredCommitProxies " , self - > configuration . getDesiredCommitProxies ( ) )
2020-08-06 15:01:57 +08:00
. detail ( " RequiredGrvProxies " , 1 )
. detail ( " DesiredGrvProxies " , self - > configuration . getDesiredGrvProxies ( ) )
. detail ( " RequiredResolvers " , 1 )
. detail ( " DesiredResolvers " , self - > configuration . getDesiredResolvers ( ) )
. detail ( " StoreType " , self - > configuration . storageServerStoreType )
. trackLatest ( " MasterRecoveryState " ) ;
2020-01-25 03:03:25 +08:00
2018-03-30 06:12:38 +08:00
//FIXME: we only need log routers for the same locality as the master
2018-04-09 12:24:05 +08:00
int maxLogRouters = self - > cstate . prevDBState . logRouterTags ;
for ( auto & old : self - > cstate . prevDBState . oldTLogData ) {
maxLogRouters = std : : max ( maxLogRouters , old . logRouterTags ) ;
2018-03-30 06:12:38 +08:00
}
2017-05-26 04:48:44 +08:00
2017-09-08 06:32:08 +08:00
state RecruitFromConfigurationReply recruits = wait (
2017-05-26 04:48:44 +08:00
brokenPromiseToNever ( self - > clusterController . recruitFromConfiguration . getReply (
2018-03-30 06:12:38 +08:00
RecruitFromConfigurationRequest ( self - > configuration , self - > lastEpochEnd = = 0 , maxLogRouters ) ) ) ) ;
2017-05-26 04:48:44 +08:00
2018-01-09 04:04:19 +08:00
self - > primaryDcId . clear ( ) ;
2018-02-27 09:09:09 +08:00
self - > remoteDcIds . clear ( ) ;
2018-03-06 11:27:46 +08:00
if ( recruits . dcId . present ( ) ) {
self - > primaryDcId . push_back ( recruits . dcId ) ;
2018-03-16 01:59:30 +08:00
if ( self - > configuration . regions . size ( ) > 1 ) {
self - > remoteDcIds . push_back ( recruits . dcId . get ( ) = = self - > configuration . regions [ 0 ] . dcId ? self - > configuration . regions [ 1 ] . dcId : self - > configuration . regions [ 0 ] . dcId ) ;
}
2018-01-18 09:03:17 +08:00
}
2019-07-24 02:45:04 +08:00
self - > backupWorkers . swap ( recruits . backupWorkers ) ;
2017-05-26 04:48:44 +08:00
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
2020-09-11 08:44:15 +08:00
. detail ( " StatusCode " , RecoveryStatus : : initializing_transaction_servers )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : initializing_transaction_servers ] )
. detail ( " CommitProxies " , recruits . commitProxies . size ( ) )
. detail ( " GrvProxies " , recruits . grvProxies . size ( ) )
. detail ( " TLogs " , recruits . tLogs . size ( ) )
. detail ( " Resolvers " , recruits . resolvers . size ( ) )
. detail ( " BackupWorkers " , self - > backupWorkers . size ( ) )
. trackLatest ( " MasterRecoveryState " ) ;
2017-05-26 04:48:44 +08:00
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand new database we are sort of lying that we are
// past the recruitment phase. In a perfect world we would split that up so that the recruitment part happens above (in parallel with recruiting the transaction servers?).
2018-08-11 04:57:10 +08:00
wait ( newSeedServers ( self , recruits , seedServers ) ) ;
2018-11-06 03:12:43 +08:00
state vector < Standalone < CommitTransactionRef > > confChanges ;
2020-09-11 08:44:15 +08:00
wait ( newCommitProxies ( self , recruits ) & & newGrvProxies ( self , recruits ) & & newResolvers ( self , recruits ) & &
2019-05-21 05:22:31 +08:00
newTLogServers ( self , recruits , oldLogSystem , & confChanges ) ) ;
2018-11-06 03:12:43 +08:00
return confChanges ;
2017-05-26 04:48:44 +08:00
}
2019-06-20 09:15:09 +08:00
ACTOR Future < Void > updateLocalityForDcId ( Optional < Key > dcId , Reference < ILogSystem > oldLogSystem , Reference < AsyncVar < PeekTxsInfo > > locality ) {
2018-09-29 03:21:08 +08:00
loop {
2019-02-22 08:52:27 +08:00
std : : pair < int8_t , int8_t > loc = oldLogSystem - > getLogSystemConfig ( ) . getLocalityForDcId ( dcId ) ;
Version ver = locality - > get ( ) . knownCommittedVersion ;
if ( ver = = invalidVersion ) {
ver = oldLogSystem - > getKnownCommittedVersion ( ) ;
2018-10-03 08:44:14 +08:00
}
2019-06-20 09:15:09 +08:00
locality - > set ( PeekTxsInfo ( loc . first , loc . second , ver ) ) ;
2019-03-19 06:03:43 +08:00
TraceEvent ( " UpdatedLocalityForDcId " ) . detail ( " DcId " , dcId ) . detail ( " Locality0 " , loc . first ) . detail ( " Locality1 " , loc . second ) . detail ( " Version " , ver ) ;
2019-02-22 08:52:27 +08:00
wait ( oldLogSystem - > onLogSystemConfigChange ( ) | | oldLogSystem - > onKnownCommittedVersionChange ( ) ) ;
2018-09-29 03:21:08 +08:00
}
}
2019-08-06 08:01:48 +08:00
ACTOR Future < Void > readTransactionSystemState ( Reference < MasterData > self , Reference < ILogSystem > oldLogSystem , Version txsPoppedVersion ) {
2019-06-20 09:15:09 +08:00
state Reference < AsyncVar < PeekTxsInfo > > myLocality = Reference < AsyncVar < PeekTxsInfo > > ( new AsyncVar < PeekTxsInfo > ( PeekTxsInfo ( tagLocalityInvalid , tagLocalityInvalid , invalidVersion ) ) ) ;
2018-09-29 03:21:08 +08:00
state Future < Void > localityUpdater = updateLocalityForDcId ( self - > myInterface . locality . dcId ( ) , oldLogSystem , myLocality ) ;
2017-05-26 04:48:44 +08:00
// Peek the txnStateTag in oldLogSystem and recover self->txnStateStore
// For now, we also obtain the recovery metadata that the log system obtained during the end_epoch process for comparison
// Sets self->lastEpochEnd and self->recoveryTransactionVersion
// Sets self->configuration to the configuration (FF/conf/ keys) at self->lastEpochEnd
// Recover transaction state store
if ( self - > txnStateStore ) self - > txnStateStore - > close ( ) ;
2019-08-06 08:01:48 +08:00
self - > txnStateLogAdapter = openDiskQueueAdapter ( oldLogSystem , myLocality , txsPoppedVersion ) ;
2018-09-01 04:07:48 +08:00
self - > txnStateStore = keyValueStoreLogSystem ( self - > txnStateLogAdapter , self - > dbgid , self - > memoryLimit , false , false , true ) ;
2017-05-26 04:48:44 +08:00
2017-12-21 08:54:57 +08:00
// Versionstamped operations (particularly those applied from DR) define a minimum commit version
// that we may recover to, as they embed the version in user-readable data and require that no
// transactions will be committed at a lower version.
2017-09-08 08:41:20 +08:00
Optional < Standalone < StringRef > > requiredCommitVersion = wait ( self - > txnStateStore - > readValue ( minRequiredCommitVersionKey ) ) ;
Version minRequiredCommitVersion = - 1 ;
if ( requiredCommitVersion . present ( ) ) {
minRequiredCommitVersion = BinaryReader : : fromStringRef < Version > ( requiredCommitVersion . get ( ) , Unversioned ( ) ) ;
}
2017-05-26 04:48:44 +08:00
// Recover version info
self - > lastEpochEnd = oldLogSystem - > getEnd ( ) - 1 ;
2017-06-01 07:23:37 +08:00
if ( self - > lastEpochEnd = = 0 ) {
2017-05-26 04:48:44 +08:00
self - > recoveryTransactionVersion = 1 ;
2017-06-01 07:23:37 +08:00
} else {
2018-07-01 21:39:04 +08:00
if ( self - > forceRecovery ) {
self - > recoveryTransactionVersion = self - > lastEpochEnd + SERVER_KNOBS - > MAX_VERSIONS_IN_FLIGHT_FORCED ;
} else {
self - > recoveryTransactionVersion = self - > lastEpochEnd + SERVER_KNOBS - > MAX_VERSIONS_IN_FLIGHT ;
}
2017-06-01 07:23:37 +08:00
if ( BUGGIFY ) {
2019-05-11 05:01:52 +08:00
self - > recoveryTransactionVersion + = deterministicRandom ( ) - > randomInt64 ( 0 , SERVER_KNOBS - > MAX_VERSIONS_IN_FLIGHT ) ;
2017-06-01 07:23:37 +08:00
}
2017-09-08 08:41:20 +08:00
if ( self - > recoveryTransactionVersion < minRequiredCommitVersion ) self - > recoveryTransactionVersion = minRequiredCommitVersion ;
2017-06-01 07:23:37 +08:00
}
2017-05-26 04:48:44 +08:00
2018-06-09 02:11:08 +08:00
TraceEvent ( " MasterRecovering " , self - > dbgid ) . detail ( " LastEpochEnd " , self - > lastEpochEnd ) . detail ( " RecoveryTransactionVersion " , self - > recoveryTransactionVersion ) ;
2017-05-26 04:48:44 +08:00
2020-02-07 05:19:24 +08:00
Standalone < RangeResultRef > rawConf = wait ( self - > txnStateStore - > readRange ( configKeys ) ) ;
self - > configuration . fromKeyValues ( rawConf . castTo < VectorRef < KeyValueRef > > ( ) ) ;
2017-05-26 04:48:44 +08:00
self - > originalConfiguration = self - > configuration ;
2017-09-23 07:59:24 +08:00
self - > hasConfiguration = true ;
2019-09-06 06:36:37 +08:00
TraceEvent ( " MasterRecoveredConfig " , self - > dbgid )
. setMaxEventLength ( 11000 )
. setMaxFieldLength ( 10000 )
. detail ( " Conf " , self - > configuration . toString ( ) )
. trackLatest ( " RecoveredConfig " ) ;
2017-05-26 04:48:44 +08:00
2020-02-07 05:19:24 +08:00
Standalone < RangeResultRef > rawLocalities = wait ( self - > txnStateStore - > readRange ( tagLocalityListKeys ) ) ;
2017-09-08 06:32:08 +08:00
self - > dcId_locality . clear ( ) ;
for ( auto & kv : rawLocalities ) {
self - > dcId_locality [ decodeTagLocalityListKey ( kv . key ) ] = decodeTagLocalityListValue ( kv . value ) ;
}
2020-02-07 05:19:24 +08:00
Standalone < RangeResultRef > rawTags = wait ( self - > txnStateStore - > readRange ( serverTagKeys ) ) ;
2018-04-23 11:28:01 +08:00
self - > allTags . clear ( ) ;
2019-11-13 05:01:29 +08:00
if ( self - > lastEpochEnd > 0 ) {
self - > allTags . push_back ( cacheTag ) ;
}
2018-10-03 08:46:22 +08:00
if ( self - > forceRecovery ) {
2019-02-19 06:30:51 +08:00
self - > safeLocality = oldLogSystem - > getLogSystemConfig ( ) . tLogs [ 0 ] . locality ;
2018-10-03 08:46:22 +08:00
for ( auto & kv : rawTags ) {
Tag tag = decodeServerTagValue ( kv . value ) ;
2019-02-19 06:30:51 +08:00
if ( tag . locality = = self - > safeLocality ) {
2018-10-03 08:46:22 +08:00
self - > allTags . push_back ( tag ) ;
}
}
} else {
for ( auto & kv : rawTags ) {
self - > allTags . push_back ( decodeServerTagValue ( kv . value ) ) ;
}
2018-04-23 11:28:01 +08:00
}
2020-02-07 05:19:24 +08:00
Standalone < RangeResultRef > rawHistoryTags = wait ( self - > txnStateStore - > readRange ( serverTagHistoryKeys ) ) ;
2018-04-23 11:28:01 +08:00
for ( auto & kv : rawHistoryTags ) {
self - > allTags . push_back ( decodeServerTagValue ( kv . value ) ) ;
}
uniquify ( self - > allTags ) ;
2017-05-26 04:48:44 +08:00
//auto kvs = self->txnStateStore->readRange( systemKeys );
//for( auto & kv : kvs.get() )
2019-03-19 06:03:43 +08:00
// TraceEvent("MasterRecoveredTXS", self->dbgid).detail("K", kv.key).detail("V", kv.value);
2017-05-26 04:48:44 +08:00
self - > txnStateLogAdapter - > setNextVersion ( oldLogSystem - > getEnd ( ) ) ; //< FIXME: (1) the log adapter should do this automatically after recovery; (2) if we make KeyValueStoreMemory guarantee immediate reads, we should be able to get rid of the discardCommit() below and not need a writable log adapter
TraceEvent ( " RTSSComplete " , self - > dbgid ) ;
return Void ( ) ;
}
ACTOR Future < Void > sendInitialCommitToResolvers ( Reference < MasterData > self ) {
state KeyRange txnKeys = allKeys ;
state Sequence txnSequence = 0 ;
ASSERT ( self - > recoveryTransactionVersion ) ;
2020-02-07 05:19:24 +08:00
state Standalone < RangeResultRef > data = self - > txnStateStore - > readRange ( txnKeys , BUGGIFY ? 3 : SERVER_KNOBS - > DESIRED_TOTAL_BYTES , SERVER_KNOBS - > DESIRED_TOTAL_BYTES ) . get ( ) ;
2020-04-06 14:09:36 +08:00
state std : : vector < Future < Void > > txnReplies ;
2017-05-26 04:48:44 +08:00
state int64_t dataOutstanding = 0 ;
2020-04-06 14:09:36 +08:00
state std : : vector < Endpoint > endpoints ;
2020-09-11 08:44:15 +08:00
for ( auto & it : self - > commitProxies ) {
2020-04-06 14:09:36 +08:00
endpoints . push_back ( it . txnState . getEndpoint ( ) ) ;
}
2017-05-26 04:48:44 +08:00
loop {
if ( ! data . size ( ) ) break ;
( ( KeyRangeRef & ) txnKeys ) = KeyRangeRef ( keyAfter ( data . back ( ) . key , txnKeys . arena ( ) ) , txnKeys . end ) ;
2020-02-07 05:19:24 +08:00
Standalone < RangeResultRef > nextData = self - > txnStateStore - > readRange ( txnKeys , BUGGIFY ? 3 : SERVER_KNOBS - > DESIRED_TOTAL_BYTES , SERVER_KNOBS - > DESIRED_TOTAL_BYTES ) . get ( ) ;
2017-05-26 04:48:44 +08:00
2020-04-06 14:09:36 +08:00
TxnStateRequest req ;
req . arena = data . arena ( ) ;
req . data = data ;
req . sequence = txnSequence ;
req . last = ! nextData . size ( ) ;
2020-04-07 11:58:43 +08:00
req . broadcastInfo = endpoints ;
2020-04-11 08:02:11 +08:00
txnReplies . push_back ( broadcastTxnRequest ( req , SERVER_KNOBS - > TXN_STATE_SEND_AMOUNT , false ) ) ;
dataOutstanding + = SERVER_KNOBS - > TXN_STATE_SEND_AMOUNT * data . arena ( ) . getSize ( ) ;
2017-05-26 04:48:44 +08:00
data = nextData ;
txnSequence + + ;
if ( dataOutstanding > SERVER_KNOBS - > MAX_TXS_SEND_MEMORY ) {
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( txnReplies ) ) ;
2017-05-26 04:48:44 +08:00
txnReplies = vector < Future < Void > > ( ) ;
dataOutstanding = 0 ;
}
2018-08-11 04:57:10 +08:00
wait ( yield ( ) ) ;
2017-05-26 04:48:44 +08:00
}
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( txnReplies ) ) ;
2017-05-26 04:48:44 +08:00
vector < Future < ResolveTransactionBatchReply > > replies ;
for ( auto & r : self - > resolvers ) {
ResolveTransactionBatchRequest req ;
req . prevVersion = - 1 ;
req . version = self - > lastEpochEnd ;
req . lastReceivedVersion = - 1 ;
replies . push_back ( brokenPromiseToNever ( r . resolve . getReply ( req ) ) ) ;
}
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( replies ) ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
ACTOR Future < Void > triggerUpdates ( Reference < MasterData > self , Reference < ILogSystem > oldLogSystem ) {
loop {
2018-08-11 04:57:10 +08:00
wait ( oldLogSystem - > onLogSystemConfigChange ( ) | | self - > cstate . fullyRecovered . getFuture ( ) | | self - > recruitmentStalled - > onChange ( ) ) ;
2017-09-08 06:32:08 +08:00
if ( self - > cstate . fullyRecovered . isSet ( ) )
2017-05-26 04:48:44 +08:00
return Void ( ) ;
self - > registrationTrigger . trigger ( ) ;
}
}
ACTOR Future < Void > discardCommit ( IKeyValueStore * store , LogSystemDiskQueueAdapter * adapter ) {
state Future < LogSystemDiskQueueAdapter : : CommitMessage > fcm = adapter - > getCommitMessage ( ) ;
state Future < Void > committed = store - > commit ( ) ;
LogSystemDiskQueueAdapter : : CommitMessage cm = wait ( fcm ) ;
ASSERT ( ! committed . isReady ( ) ) ;
cm . acknowledge . send ( Void ( ) ) ;
ASSERT ( committed . isReady ( ) ) ;
return Void ( ) ;
}
2019-02-19 06:54:28 +08:00
void updateConfigForForcedRecovery ( Reference < MasterData > self , vector < Standalone < CommitTransactionRef > > * initialConfChanges ) {
bool regionsChanged = false ;
for ( auto & it : self - > configuration . regions ) {
if ( it . dcId = = self - > myInterface . locality . dcId ( ) . get ( ) & & it . priority < 0 ) {
it . priority = 1 ;
regionsChanged = true ;
} else if ( it . dcId ! = self - > myInterface . locality . dcId ( ) . get ( ) & & it . priority > = 0 ) {
it . priority = - 1 ;
regionsChanged = true ;
}
}
Standalone < CommitTransactionRef > regionCommit ;
regionCommit . mutations . push_back_deep ( regionCommit . arena ( ) , MutationRef ( MutationRef : : SetValue , configKeysPrefix . toString ( ) + " usable_regions " , LiteralStringRef ( " 1 " ) ) ) ;
self - > configuration . applyMutation ( regionCommit . mutations . back ( ) ) ;
if ( regionsChanged ) {
std : : sort ( self - > configuration . regions . begin ( ) , self - > configuration . regions . end ( ) , RegionInfo : : sort_by_priority ( ) ) ;
StatusObject regionJSON ;
regionJSON [ " regions " ] = self - > configuration . getRegionJSON ( ) ;
2020-05-23 08:14:21 +08:00
regionCommit . mutations . push_back_deep ( regionCommit . arena ( ) , MutationRef ( MutationRef : : SetValue , configKeysPrefix . toString ( ) + " regions " , BinaryWriter : : toValue ( regionJSON , IncludeVersion ( ProtocolVersion : : withRegionConfiguration ( ) ) ) . toString ( ) ) ) ;
2019-02-20 03:26:53 +08:00
self - > configuration . applyMutation ( regionCommit . mutations . back ( ) ) ; //modifying the configuration directly does not change the configuration when it is re-serialized unless we call applyMutation
2019-09-06 06:36:37 +08:00
TraceEvent ( " ForcedRecoveryConfigChange " , self - > dbgid )
. setMaxEventLength ( 11000 )
. setMaxFieldLength ( 10000 )
. detail ( " Conf " , self - > configuration . toString ( ) ) ;
2019-02-19 06:54:28 +08:00
}
initialConfChanges - > push_back ( regionCommit ) ;
}
2019-08-06 08:01:48 +08:00
ACTOR Future < Void > recoverFrom ( Reference < MasterData > self , Reference < ILogSystem > oldLogSystem , vector < StorageServerInterface > * seedServers , vector < Standalone < CommitTransactionRef > > * initialConfChanges , Future < Version > poppedTxsVersion ) {
2017-05-26 04:48:44 +08:00
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
. detail ( " StatusCode " , RecoveryStatus : : reading_transaction_system_state )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : reading_transaction_system_state ] )
2017-10-25 07:28:50 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2017-09-23 07:59:24 +08:00
self - > hasConfiguration = false ;
2017-05-26 04:48:44 +08:00
if ( BUGGIFY )
2018-08-11 04:57:10 +08:00
wait ( delay ( 10.0 ) ) ;
2017-05-26 04:48:44 +08:00
2019-08-06 08:01:48 +08:00
Version txsPoppedVersion = wait ( poppedTxsVersion ) ;
wait ( readTransactionSystemState ( self , oldLogSystem , txsPoppedVersion ) ) ;
2017-05-26 04:48:44 +08:00
for ( auto & itr : * initialConfChanges ) {
for ( auto & m : itr . mutations ) {
self - > configuration . applyMutation ( m ) ;
}
}
2019-02-19 06:54:28 +08:00
if ( self - > forceRecovery ) {
updateConfigForForcedRecovery ( self , initialConfChanges ) ;
}
2017-05-26 04:48:44 +08:00
debug_checkMaxRestoredVersion ( UID ( ) , self - > lastEpochEnd , " DBRecovery " ) ;
// Ordinarily we pass through this loop once and recover. We go around the loop if recovery stalls for more than a second,
// a provisional master is initialized, and an "emergency transaction" is submitted that might change the configuration so that we can
// finish recovery.
2018-11-12 04:37:53 +08:00
state std : : map < Optional < Value > , int8_t > originalLocalityMap = self - > dcId_locality ;
2018-11-06 03:12:43 +08:00
state Future < vector < Standalone < CommitTransactionRef > > > recruitments = recruitEverything ( self , seedServers , oldLogSystem ) ;
2019-03-23 05:13:58 +08:00
state double provisionalDelay = SERVER_KNOBS - > PROVISIONAL_START_DELAY ;
2017-05-26 04:48:44 +08:00
loop {
2019-03-23 05:13:58 +08:00
state Future < Standalone < CommitTransactionRef > > provisional = provisionalMaster ( self , delay ( provisionalDelay ) ) ;
provisionalDelay = std : : min ( SERVER_KNOBS - > PROVISIONAL_MAX_DELAY , provisionalDelay * SERVER_KNOBS - > PROVISIONAL_DELAY_GROWTH ) ;
2017-05-26 04:48:44 +08:00
choose {
2018-11-06 03:12:43 +08:00
when ( vector < Standalone < CommitTransactionRef > > confChanges = wait ( recruitments ) ) {
initialConfChanges - > insert ( initialConfChanges - > end ( ) , confChanges . begin ( ) , confChanges . end ( ) ) ;
2017-05-26 04:48:44 +08:00
provisional . cancel ( ) ;
break ;
}
when ( Standalone < CommitTransactionRef > _req = wait ( provisional ) ) {
state Standalone < CommitTransactionRef > req = _req ; // mutable
TEST ( true ) ; // Emergency transaction processing during recovery
TraceEvent ( " EmergencyTransaction " , self - > dbgid ) ;
for ( auto m = req . mutations . begin ( ) ; m ! = req . mutations . end ( ) ; + + m )
2019-03-19 06:03:43 +08:00
TraceEvent ( " EmergencyTransactionMutation " , self - > dbgid ) . detail ( " MType " , m - > type ) . detail ( " P1 " , m - > param1 ) . detail ( " P2 " , m - > param2 ) ;
2017-05-26 04:48:44 +08:00
DatabaseConfiguration oldConf = self - > configuration ;
self - > configuration = self - > originalConfiguration ;
for ( auto & m : req . mutations )
self - > configuration . applyMutation ( m ) ;
initialConfChanges - > clear ( ) ;
2018-07-02 11:59:06 +08:00
if ( self - > originalConfiguration . isValid ( ) & & self - > configuration . usableRegions ! = self - > originalConfiguration . usableRegions ) {
TraceEvent ( SevWarnAlways , " CannotChangeUsableRegions " , self - > dbgid ) ;
self - > configuration = self - > originalConfiguration ;
} else {
initialConfChanges - > push_back ( req ) ;
}
2019-02-19 06:54:28 +08:00
if ( self - > forceRecovery ) {
updateConfigForForcedRecovery ( self , initialConfChanges ) ;
}
2017-05-26 04:48:44 +08:00
if ( self - > configuration ! = oldConf ) { //confChange does not trigger when including servers
2018-11-12 04:37:53 +08:00
self - > dcId_locality = originalLocalityMap ;
2018-11-06 03:12:43 +08:00
recruitments = recruitEverything ( self , seedServers , oldLogSystem ) ;
2017-05-26 04:48:44 +08:00
}
}
}
provisional . cancel ( ) ;
}
return Void ( ) ;
}
ACTOR Future < Void > getVersion ( Reference < MasterData > self , GetCommitVersionRequest req ) {
2020-07-08 00:06:13 +08:00
state Span span ( " M:getVersion " _loc , { req . spanContext } ) ;
2020-09-16 13:29:49 +08:00
state std : : map < UID , CommitProxyVersionReplies > : : iterator proxyItr = self - > lastCommitProxyVersionReplies . find ( req . requestingProxy ) ; // lastCommitProxyVersionReplies never changes
2017-05-26 04:48:44 +08:00
2020-09-16 13:29:49 +08:00
if ( proxyItr = = self - > lastCommitProxyVersionReplies . end ( ) ) {
2017-05-26 04:48:44 +08:00
// Request from invalid proxy (e.g. from duplicate recruitment request)
req . reply . send ( Never ( ) ) ;
return Void ( ) ;
}
TEST ( proxyItr - > second . latestRequestNum . get ( ) < req . requestNum - 1 ) ; // Commit version request queued up
2018-08-11 04:57:10 +08:00
wait ( proxyItr - > second . latestRequestNum . whenAtLeast ( req . requestNum - 1 ) ) ;
2017-05-26 04:48:44 +08:00
auto itr = proxyItr - > second . replies . find ( req . requestNum ) ;
if ( itr ! = proxyItr - > second . replies . end ( ) ) {
TEST ( true ) ; // Duplicate request for sequence
req . reply . send ( itr - > second ) ;
}
else if ( req . requestNum < = proxyItr - > second . latestRequestNum . get ( ) ) {
TEST ( true ) ; // Old request for previously acknowledged sequence - may be impossible with current FlowTransport implementation
ASSERT ( req . requestNum < proxyItr - > second . latestRequestNum . get ( ) ) ; // The latest request can never be acknowledged
req . reply . send ( Never ( ) ) ;
}
else {
GetCommitVersionReply rep ;
if ( self - > version = = invalidVersion ) {
self - > lastVersionTime = now ( ) ;
self - > version = self - > recoveryTransactionVersion ;
rep . prevVersion = self - > lastEpochEnd ;
}
else {
double t1 = now ( ) ;
if ( BUGGIFY ) {
t1 = self - > lastVersionTime ;
}
rep . prevVersion = self - > version ;
2018-07-01 21:39:04 +08:00
self - > version + = std : : max < Version > ( 1 , std : : min < Version > ( SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS , SERVER_KNOBS - > VERSIONS_PER_SECOND * ( t1 - self - > lastVersionTime ) ) ) ;
2017-05-26 04:48:44 +08:00
TEST ( self - > version - rep . prevVersion = = 1 ) ; // Minimum possible version gap
TEST ( self - > version - rep . prevVersion = = SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) ; // Maximum possible version gap
self - > lastVersionTime = t1 ;
if ( self - > resolverNeedingChanges . count ( req . requestingProxy ) ) {
rep . resolverChanges = self - > resolverChanges . get ( ) ;
rep . resolverChangesVersion = self - > resolverChangesVersion ;
self - > resolverNeedingChanges . erase ( req . requestingProxy ) ;
if ( self - > resolverNeedingChanges . empty ( ) )
self - > resolverChanges . set ( Standalone < VectorRef < ResolverMoveRef > > ( ) ) ;
}
}
rep . version = self - > version ;
rep . requestNum = req . requestNum ;
proxyItr - > second . replies . erase ( proxyItr - > second . replies . begin ( ) , proxyItr - > second . replies . upper_bound ( req . mostRecentProcessedRequestNum ) ) ;
proxyItr - > second . replies [ req . requestNum ] = rep ;
ASSERT ( rep . prevVersion > = 0 ) ;
req . reply . send ( rep ) ;
ASSERT ( proxyItr - > second . latestRequestNum . get ( ) = = req . requestNum - 1 ) ;
proxyItr - > second . latestRequestNum . set ( req . requestNum ) ;
}
return Void ( ) ;
}
ACTOR Future < Void > provideVersions ( Reference < MasterData > self ) {
state ActorCollection versionActors ( false ) ;
2020-09-16 13:29:49 +08:00
for ( auto & p : self - > commitProxies ) self - > lastCommitProxyVersionReplies [ p . id ( ) ] = CommitProxyVersionReplies ( ) ;
2017-05-26 04:48:44 +08:00
loop {
choose {
when ( GetCommitVersionRequest req = waitNext ( self - > myInterface . getCommitVersion . getFuture ( ) ) ) {
versionActors . add ( getVersion ( self , req ) ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( versionActors . getResult ( ) ) ) { }
2017-05-26 04:48:44 +08:00
}
}
}
2020-06-10 02:09:46 +08:00
ACTOR Future < Void > serveLiveCommittedVersion ( Reference < MasterData > self ) {
loop {
choose {
2020-06-11 06:55:23 +08:00
when ( GetRawCommittedVersionRequest req = waitNext ( self - > myInterface . getLiveCommittedVersion . getFuture ( ) ) ) {
if ( req . debugID . present ( ) )
g_traceBatch . addEvent ( " TransactionDebug " , req . debugID . get ( ) . first ( ) , " MasterServer.serveLiveCommittedVersion.GetRawCommittedVersion " ) ;
2020-06-17 11:37:30 +08:00
if ( self - > liveCommittedVersion = = invalidVersion ) {
2020-06-10 02:09:46 +08:00
self - > liveCommittedVersion = self - > recoveryTransactionVersion ;
}
2020-07-15 15:37:41 +08:00
GetRawCommittedVersionReply reply ;
2020-06-11 06:55:23 +08:00
reply . version = self - > liveCommittedVersion ;
2020-06-12 05:07:37 +08:00
reply . locked = self - > databaseLocked ;
2020-06-11 06:55:23 +08:00
reply . metadataVersion = self - > proxyMetadataVersion ;
2020-07-15 15:37:41 +08:00
reply . minKnownCommittedVersion = self - > minKnownCommittedVersion ;
2020-06-11 06:55:23 +08:00
req . reply . send ( reply ) ;
2020-06-10 02:09:46 +08:00
}
2020-06-11 06:55:23 +08:00
when ( ReportRawCommittedVersionRequest req = waitNext ( self - > myInterface . reportLiveCommittedVersion . getFuture ( ) ) ) {
2020-07-15 15:37:41 +08:00
self - > minKnownCommittedVersion = std : : max ( self - > minKnownCommittedVersion , req . minKnownCommittedVersion ) ;
2020-06-11 06:55:23 +08:00
if ( req . version > self - > liveCommittedVersion ) {
self - > liveCommittedVersion = req . version ;
2020-06-12 05:07:37 +08:00
self - > databaseLocked = req . locked ;
2020-06-11 06:55:23 +08:00
self - > proxyMetadataVersion = req . metadataVersion ;
}
2020-06-10 11:47:34 +08:00
req . reply . send ( Void ( ) ) ;
2020-06-10 02:09:46 +08:00
}
}
}
}
2017-05-26 04:48:44 +08:00
std : : pair < KeyRangeRef , bool > findRange ( CoalescedKeyRangeMap < int > & key_resolver , Standalone < VectorRef < ResolverMoveRef > > & movedRanges , int src , int dest ) {
auto ranges = key_resolver . ranges ( ) ;
auto prev = ranges . begin ( ) ;
auto it = ranges . begin ( ) ;
+ + it ;
if ( it = = ranges . end ( ) ) {
if ( ranges . begin ( ) . value ( ) ! = src | | std : : find ( movedRanges . begin ( ) , movedRanges . end ( ) , ResolverMoveRef ( ranges . begin ( ) - > range ( ) , dest ) ) ! = movedRanges . end ( ) )
throw operation_failed ( ) ;
return std : : make_pair ( ranges . begin ( ) . range ( ) , true ) ;
}
std : : set < int > borders ;
//If possible expand an existing boundary between the two resolvers
for ( ; it ! = ranges . end ( ) ; + + it ) {
if ( it - > value ( ) = = src & & prev - > value ( ) = = dest & & std : : find ( movedRanges . begin ( ) , movedRanges . end ( ) , ResolverMoveRef ( it - > range ( ) , dest ) ) = = movedRanges . end ( ) ) {
return std : : make_pair ( it - > range ( ) , true ) ;
}
if ( it - > value ( ) = = dest & & prev - > value ( ) = = src & & std : : find ( movedRanges . begin ( ) , movedRanges . end ( ) , ResolverMoveRef ( prev - > range ( ) , dest ) ) = = movedRanges . end ( ) ) {
return std : : make_pair ( prev - > range ( ) , false ) ;
}
if ( it - > value ( ) = = dest )
borders . insert ( prev - > value ( ) ) ;
if ( prev - > value ( ) = = dest )
borders . insert ( it - > value ( ) ) ;
+ + prev ;
}
prev = ranges . begin ( ) ;
it = ranges . begin ( ) ;
+ + it ;
//If possible create a new boundry which doesn't exist yet
for ( ; it ! = ranges . end ( ) ; + + it ) {
if ( it - > value ( ) = = src & & ! borders . count ( prev - > value ( ) ) & & std : : find ( movedRanges . begin ( ) , movedRanges . end ( ) , ResolverMoveRef ( it - > range ( ) , dest ) ) = = movedRanges . end ( ) ) {
return std : : make_pair ( it - > range ( ) , true ) ;
}
if ( prev - > value ( ) = = src & & ! borders . count ( it - > value ( ) ) & & std : : find ( movedRanges . begin ( ) , movedRanges . end ( ) , ResolverMoveRef ( prev - > range ( ) , dest ) ) = = movedRanges . end ( ) ) {
return std : : make_pair ( prev - > range ( ) , false ) ;
}
+ + prev ;
}
it = ranges . begin ( ) ;
for ( ; it ! = ranges . end ( ) ; + + it ) {
if ( it - > value ( ) = = src & & std : : find ( movedRanges . begin ( ) , movedRanges . end ( ) , ResolverMoveRef ( it - > range ( ) , dest ) ) = = movedRanges . end ( ) ) {
return std : : make_pair ( it - > range ( ) , true ) ;
}
}
throw operation_failed ( ) ; //we are already attempting to move all of the data one resolver is assigned, so do not move anything
}
ACTOR Future < Void > resolutionBalancing ( Reference < MasterData > self ) {
state CoalescedKeyRangeMap < int > key_resolver ;
key_resolver . insert ( allKeys , 0 ) ;
loop {
2019-06-25 17:47:35 +08:00
wait ( delay ( SERVER_KNOBS - > MIN_BALANCE_TIME , TaskPriority : : ResolutionMetrics ) ) ;
2017-05-26 04:48:44 +08:00
while ( self - > resolverChanges . get ( ) . size ( ) )
2018-08-11 04:57:10 +08:00
wait ( self - > resolverChanges . onChange ( ) ) ;
2019-08-29 05:40:50 +08:00
state std : : vector < Future < ResolutionMetricsReply > > futures ;
2017-05-26 04:48:44 +08:00
for ( auto & p : self - > resolvers )
2019-06-25 17:47:35 +08:00
futures . push_back ( brokenPromiseToNever ( p . metrics . getReply ( ResolutionMetricsRequest ( ) , TaskPriority : : ResolutionMetrics ) ) ) ;
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( futures ) ) ;
2017-05-26 04:48:44 +08:00
state IndexedSet < std : : pair < int64_t , int > , NoMetric > metrics ;
int64_t total = 0 ;
for ( int i = 0 ; i < futures . size ( ) ; i + + ) {
2019-08-29 05:40:50 +08:00
total + = futures [ i ] . get ( ) . value ;
metrics . insert ( std : : make_pair ( futures [ i ] . get ( ) . value , i ) , NoMetric ( ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("ResolverMetric").detail("I", i).detail("Metric", futures[i].get());
2017-05-26 04:48:44 +08:00
}
if ( metrics . lastItem ( ) - > first - metrics . begin ( ) - > first > SERVER_KNOBS - > MIN_BALANCE_DIFFERENCE ) {
try {
state int src = metrics . lastItem ( ) - > second ;
state int dest = metrics . begin ( ) - > second ;
state int64_t amount = std : : min ( metrics . lastItem ( ) - > first - total / self - > resolvers . size ( ) , total / self - > resolvers . size ( ) - metrics . begin ( ) - > first ) / 2 ;
state Standalone < VectorRef < ResolverMoveRef > > movedRanges ;
loop {
state std : : pair < KeyRangeRef , bool > range = findRange ( key_resolver , movedRanges , src , dest ) ;
ResolutionSplitRequest req ;
req . front = range . second ;
req . offset = amount ;
req . range = range . first ;
2019-06-25 17:47:35 +08:00
ResolutionSplitReply split = wait ( brokenPromiseToNever ( self - > resolvers [ metrics . lastItem ( ) - > second ] . split . getReply ( req , TaskPriority : : ResolutionMetrics ) ) ) ;
2017-05-26 04:48:44 +08:00
KeyRangeRef moveRange = range . second ? KeyRangeRef ( range . first . begin , split . key ) : KeyRangeRef ( split . key , range . first . end ) ;
movedRanges . push_back_deep ( movedRanges . arena ( ) , ResolverMoveRef ( moveRange , dest ) ) ;
2019-03-19 06:03:43 +08:00
TraceEvent ( " MovingResolutionRange " ) . detail ( " Src " , src ) . detail ( " Dest " , dest ) . detail ( " Amount " , amount ) . detail ( " StartRange " , range . first ) . detail ( " MoveRange " , moveRange ) . detail ( " Used " , split . used ) . detail ( " KeyResolverRanges " , key_resolver . size ( ) ) ;
2017-05-26 04:48:44 +08:00
amount - = split . used ;
if ( moveRange ! = range . first | | amount < = 0 )
break ;
}
for ( auto & it : movedRanges )
key_resolver . insert ( it . range , it . dest ) ;
//for(auto& it : key_resolver.ranges())
2019-03-19 06:03:43 +08:00
// TraceEvent("KeyResolver").detail("Range", it.range()).detail("Value", it.value());
2017-05-26 04:48:44 +08:00
self - > resolverChangesVersion = self - > version + 1 ;
2020-09-11 08:44:15 +08:00
for ( auto & p : self - > commitProxies ) self - > resolverNeedingChanges . insert ( p . id ( ) ) ;
2017-05-26 04:48:44 +08:00
self - > resolverChanges . set ( movedRanges ) ;
} catch ( Error & e ) {
if ( e . code ( ) ! = error_code_operation_failed )
throw ;
}
}
}
}
static std : : set < int > const & normalMasterErrors ( ) {
static std : : set < int > s ;
if ( s . empty ( ) ) {
s . insert ( error_code_tlog_stopped ) ;
s . insert ( error_code_master_tlog_failed ) ;
2020-09-11 08:44:15 +08:00
s . insert ( error_code_commit_proxy_failed ) ;
2020-07-15 15:37:41 +08:00
s . insert ( error_code_grv_proxy_failed ) ;
2017-05-26 04:48:44 +08:00
s . insert ( error_code_master_resolver_failed ) ;
2019-05-21 05:22:31 +08:00
s . insert ( error_code_master_backup_worker_failed ) ;
2017-05-26 04:48:44 +08:00
s . insert ( error_code_recruitment_failed ) ;
s . insert ( error_code_no_more_servers ) ;
s . insert ( error_code_master_recovery_failed ) ;
s . insert ( error_code_coordinated_state_conflict ) ;
s . insert ( error_code_master_max_versions_in_flight ) ;
s . insert ( error_code_worker_removed ) ;
s . insert ( error_code_new_coordinators_timed_out ) ;
2018-04-25 07:10:14 +08:00
s . insert ( error_code_broken_promise ) ;
2017-05-26 04:48:44 +08:00
}
return s ;
}
2017-09-08 06:32:08 +08:00
ACTOR Future < Void > changeCoordinators ( Reference < MasterData > self ) {
2017-05-26 04:48:44 +08:00
loop {
ChangeCoordinatorsRequest req = waitNext ( self - > myInterface . changeCoordinators . getFuture ( ) ) ;
state ChangeCoordinatorsRequest changeCoordinatorsRequest = req ;
2017-10-20 06:36:32 +08:00
while ( ! self - > cstate . previousWrite . isReady ( ) ) {
2018-08-11 04:57:10 +08:00
wait ( self - > cstate . previousWrite ) ;
wait ( delay ( 0 ) ) ; //if a new core state is ready to be written, have that take priority over our finalizing write;
2017-10-20 06:36:32 +08:00
}
if ( ! self - > cstate . fullyRecovered . isSet ( ) ) {
2018-08-11 04:57:10 +08:00
wait ( self - > cstate . write ( self - > cstate . myDBState , true ) ) ;
2017-10-20 06:36:32 +08:00
}
2017-05-26 04:48:44 +08:00
try {
2018-08-11 04:57:10 +08:00
wait ( self - > cstate . move ( ClusterConnectionString ( changeCoordinatorsRequest . newConnectionString . toString ( ) ) ) ) ;
2017-05-26 04:48:44 +08:00
}
catch ( Error & e ) {
if ( e . code ( ) ! = error_code_actor_cancelled )
changeCoordinatorsRequest . reply . sendError ( e ) ;
throw ;
}
throw internal_error ( ) ;
}
}
ACTOR Future < Void > rejoinRequestHandler ( Reference < MasterData > self ) {
loop {
TLogRejoinRequest req = waitNext ( self - > myInterface . tlogRejoin . getFuture ( ) ) ;
req . reply . send ( true ) ;
}
}
2019-07-13 04:10:21 +08:00
ACTOR Future < Void > trackTlogRecovery ( Reference < MasterData > self , Reference < AsyncVar < Reference < ILogSystem > > > oldLogSystems , Future < Void > minRecoveryDuration ) {
2017-05-26 04:48:44 +08:00
state Future < Void > rejoinRequests = Never ( ) ;
2017-09-08 06:32:08 +08:00
state DBRecoveryCount recoverCount = self - > cstate . myDBState . recoveryCount + 1 ;
2017-05-26 04:48:44 +08:00
loop {
2017-09-08 06:32:08 +08:00
state DBCoreState newState ;
self - > logSystem - > toCoreState ( newState ) ;
newState . recoveryCount = recoverCount ;
2018-02-20 08:49:57 +08:00
state Future < Void > changed = self - > logSystem - > onCoreStateChanged ( ) ;
2017-09-08 06:32:08 +08:00
ASSERT ( newState . tLogs [ 0 ] . tLogWriteAntiQuorum = = self - > configuration . tLogWriteAntiQuorum & & newState . tLogs [ 0 ] . tLogReplicationFactor = = self - > configuration . tLogReplicationFactor ) ;
2018-04-17 01:14:39 +08:00
state bool allLogs = newState . tLogs . size ( ) = = self - > configuration . expectedLogSets ( self - > primaryDcId . size ( ) ? self - > primaryDcId [ 0 ] : Optional < Key > ( ) ) ;
state bool finalUpdate = ! newState . oldTLogData . size ( ) & & allLogs ;
2018-08-11 04:57:10 +08:00
wait ( self - > cstate . write ( newState , finalUpdate ) ) ;
2019-07-13 04:10:21 +08:00
wait ( minRecoveryDuration ) ;
2018-04-09 12:24:05 +08:00
self - > logSystem - > coreStateWritten ( newState ) ;
2018-04-11 02:14:57 +08:00
if ( self - > cstateUpdated . canBeSet ( ) ) {
self - > cstateUpdated . send ( Void ( ) ) ;
}
2018-07-04 13:59:04 +08:00
2017-10-25 06:09:31 +08:00
if ( finalUpdate ) {
2018-07-05 15:08:51 +08:00
self - > recoveryState = RecoveryState : : FULLY_RECOVERED ;
2017-10-25 06:09:31 +08:00
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
2018-07-05 15:08:51 +08:00
. detail ( " StatusCode " , RecoveryStatus : : fully_recovered )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : fully_recovered ] )
2020-09-12 05:23:27 +08:00
. detail ( " FullyRecoveredAtVersion " , self - > version )
2018-08-05 03:50:56 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2020-03-17 01:29:17 +08:00
TraceEvent ( " MasterRecoveryGenerations " , self - > dbgid )
2020-03-17 02:09:42 +08:00
. detail ( " ActiveGenerations " , 1 )
2020-03-17 01:29:17 +08:00
. trackLatest ( " MasterRecoveryGenerations " ) ;
2018-07-05 15:08:51 +08:00
} else if ( ! newState . oldTLogData . size ( ) & & self - > recoveryState < RecoveryState : : STORAGE_RECOVERED ) {
self - > recoveryState = RecoveryState : : STORAGE_RECOVERED ;
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
. detail ( " StatusCode " , RecoveryStatus : : storage_recovered )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : storage_recovered ] )
2018-08-05 03:50:56 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2018-07-05 15:08:51 +08:00
} else if ( allLogs & & self - > recoveryState < RecoveryState : : ALL_LOGS_RECRUITED ) {
self - > recoveryState = RecoveryState : : ALL_LOGS_RECRUITED ;
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
. detail ( " StatusCode " , RecoveryStatus : : all_logs_recruited )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : all_logs_recruited ] )
2018-08-05 03:50:56 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2017-10-25 06:09:31 +08:00
}
2018-07-04 13:59:04 +08:00
2018-07-15 07:26:45 +08:00
if ( newState . oldTLogData . size ( ) & & self - > configuration . repopulateRegionAntiQuorum > 0 & & self - > logSystem - > remoteStorageRecovered ( ) ) {
TraceEvent ( SevWarnAlways , " RecruitmentStalled_RemoteStorageRecovered " , self - > dbgid ) ;
self - > recruitmentStalled - > set ( true ) ;
}
2017-09-08 06:32:08 +08:00
self - > registrationTrigger . trigger ( ) ;
2018-04-17 01:14:39 +08:00
2017-09-08 06:32:08 +08:00
if ( finalUpdate ) {
2017-05-26 04:48:44 +08:00
oldLogSystems - > get ( ) - > stopRejoins ( ) ;
rejoinRequests = rejoinRequestHandler ( self ) ;
2017-09-08 06:32:08 +08:00
return Void ( ) ;
2017-05-26 04:48:44 +08:00
}
2018-08-11 04:57:10 +08:00
wait ( changed ) ;
2017-05-26 04:48:44 +08:00
}
}
2020-01-17 13:21:25 +08:00
ACTOR Future < Void > configurationMonitor ( Reference < MasterData > self , Database cx ) {
2017-11-15 05:57:37 +08:00
loop {
state ReadYourWritesTransaction tr ( cx ) ;
loop {
try {
tr . setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
2018-07-06 03:17:41 +08:00
Standalone < RangeResultRef > results = wait ( tr . getRange ( configKeys , CLIENT_KNOBS - > TOO_MANY ) ) ;
2017-11-15 05:57:37 +08:00
ASSERT ( ! results . more & & results . size ( ) < CLIENT_KNOBS - > TOO_MANY ) ;
DatabaseConfiguration conf ;
conf . fromKeyValues ( ( VectorRef < KeyValueRef > ) results ) ;
if ( conf ! = self - > configuration ) {
2018-07-06 03:17:41 +08:00
if ( self - > recoveryState ! = RecoveryState : : ALL_LOGS_RECRUITED & & self - > recoveryState ! = RecoveryState : : FULLY_RECOVERED ) {
throw master_recovery_failed ( ) ;
}
2017-11-15 05:57:37 +08:00
self - > configuration = conf ;
self - > registrationTrigger . trigger ( ) ;
}
2019-10-04 00:44:08 +08:00
state Future < Void > watchFuture = tr . watch ( moveKeysLockOwnerKey ) | | tr . watch ( excludedServersVersionKey ) | | tr . watch ( failedServersVersionKey ) ;
2018-08-11 04:57:10 +08:00
wait ( tr . commit ( ) ) ;
wait ( watchFuture ) ;
2017-11-15 05:57:37 +08:00
break ;
} catch ( Error & e ) {
2018-08-11 04:57:10 +08:00
wait ( tr . onError ( e ) ) ;
2017-11-15 05:57:37 +08:00
}
}
}
}
2020-03-22 04:44:02 +08:00
ACTOR static Future < Optional < Version > > getMinBackupVersion ( Reference < MasterData > self , Database cx ) {
loop {
state ReadYourWritesTransaction tr ( cx ) ;
try {
tr . setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
Optional < Value > value = wait ( tr . get ( backupStartedKey ) ) ;
Optional < Version > minVersion ;
if ( value . present ( ) ) {
auto uidVersions = decodeBackupStartedValue ( value . get ( ) ) ;
TraceEvent e ( " GotBackupStartKey " , self - > dbgid ) ;
int i = 1 ;
for ( auto [ uid , version ] : uidVersions ) {
e . detail ( format ( " BackupID%d " , i ) , uid ) . detail ( format ( " Version%d " , i ) , version ) ;
i + + ;
minVersion = minVersion . present ( ) ? std : : min ( version , minVersion . get ( ) ) : version ;
}
} else {
TraceEvent ( " EmptyBackupStartKey " , self - > dbgid ) ;
}
return minVersion ;
} catch ( Error & e ) {
wait ( tr . onError ( e ) ) ;
}
}
}
2020-01-17 13:21:25 +08:00
ACTOR static Future < Void > recruitBackupWorkers ( Reference < MasterData > self , Database cx ) {
ASSERT ( self - > backupWorkers . size ( ) > 0 ) ;
2019-07-24 02:45:04 +08:00
2020-03-06 03:34:37 +08:00
// Avoid race between a backup worker's save progress and the reads below.
wait ( delay ( SERVER_KNOBS - > SECONDS_BEFORE_RECRUIT_BACKUP_WORKER ) ) ;
2019-07-30 01:37:42 +08:00
state LogEpoch epoch = self - > cstate . myDBState . recoveryCount ;
2019-10-01 04:16:28 +08:00
state Reference < BackupProgress > backupProgress (
2019-10-02 05:55:08 +08:00
new BackupProgress ( self - > dbgid , self - > logSystem - > getOldEpochTagsVersionsInfo ( ) ) ) ;
2020-06-28 12:30:38 +08:00
state Future < Void > gotProgress = getBackupProgress ( cx , self - > dbgid , backupProgress , /*logging=*/ true ) ;
2020-01-17 11:16:23 +08:00
state std : : vector < Future < InitializeBackupReply > > initializationReplies ;
2019-10-01 04:16:28 +08:00
state std : : vector < std : : pair < UID , Tag > > idsTags ; // worker IDs and tags for current epoch
state int logRouterTags = self - > logSystem - > getLogRouterTags ( ) ;
for ( int i = 0 ; i < logRouterTags ; i + + ) {
idsTags . emplace_back ( deterministicRandom ( ) - > randomUniqueID ( ) , Tag ( tagLocalityLogRouter , i ) ) ;
}
2020-03-09 11:50:32 +08:00
const Version startVersion = self - > logSystem - > getBackupStartVersion ( ) ;
2019-07-30 01:37:42 +08:00
state int i = 0 ;
for ( ; i < logRouterTags ; i + + ) {
2019-07-24 02:45:04 +08:00
const auto & worker = self - > backupWorkers [ i % self - > backupWorkers . size ( ) ] ;
2019-10-01 04:16:28 +08:00
InitializeBackupRequest req ( idsTags [ i ] . first ) ;
2019-07-30 01:37:42 +08:00
req . recruitedEpoch = epoch ;
req . backupEpoch = epoch ;
2019-10-01 04:16:28 +08:00
req . routerTag = idsTags [ i ] . second ;
2020-02-21 08:28:27 +08:00
req . totalTags = logRouterTags ;
2019-07-24 02:45:04 +08:00
req . startVersion = startVersion ;
TraceEvent ( " BackupRecruitment " , self - > dbgid )
2020-03-22 04:44:02 +08:00
. detail ( " RequestID " , req . reqId )
2020-01-04 02:22:41 +08:00
. detail ( " Tag " , req . routerTag . toString ( ) )
2019-07-24 02:45:04 +08:00
. detail ( " Epoch " , epoch )
2019-08-13 04:15:15 +08:00
. detail ( " BackupEpoch " , epoch )
2019-07-24 02:45:04 +08:00
. detail ( " StartVersion " , req . startVersion ) ;
initializationReplies . push_back (
transformErrors ( throwErrorOr ( worker . backup . getReplyUnlessFailedFor (
req , SERVER_KNOBS - > BACKUP_TIMEOUT , SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) ,
2019-08-22 04:33:44 +08:00
master_backup_worker_failed ( ) ) ) ;
2019-07-24 02:45:04 +08:00
}
2020-03-22 04:44:02 +08:00
state Future < Optional < Version > > fMinVersion = getMinBackupVersion ( self , cx ) ;
wait ( gotProgress & & success ( fMinVersion ) ) ;
2020-03-24 11:44:31 +08:00
TraceEvent ( " MinBackupVersion " , self - > dbgid ) . detail ( " Version " , fMinVersion . get ( ) . present ( ) ? fMinVersion . get ( ) : - 1 ) ;
2020-03-22 04:44:02 +08:00
2020-02-21 08:28:27 +08:00
std : : map < std : : tuple < LogEpoch , Version , int > , std : : map < Tag , Version > > toRecruit =
backupProgress - > getUnfinishedBackup ( ) ;
2020-03-21 09:39:51 +08:00
for ( const auto & [ epochVersionTags , tagVersions ] : toRecruit ) {
2020-03-24 04:35:33 +08:00
const Version oldEpochEnd = std : : get < 1 > ( epochVersionTags ) ;
2020-03-24 11:44:31 +08:00
if ( ! fMinVersion . get ( ) . present ( ) | | fMinVersion . get ( ) . get ( ) + 1 > = oldEpochEnd ) {
2020-03-22 04:44:02 +08:00
TraceEvent ( " SkipBackupRecruitment " , self - > dbgid )
2020-03-26 00:18:49 +08:00
. detail ( " MinVersion " , fMinVersion . get ( ) . present ( ) ? fMinVersion . get ( ) : - 1 )
2020-03-22 04:44:02 +08:00
. detail ( " Epoch " , epoch )
2020-03-24 04:35:33 +08:00
. detail ( " OldEpoch " , std : : get < 0 > ( epochVersionTags ) )
2020-03-22 04:44:02 +08:00
. detail ( " OldEpochEnd " , oldEpochEnd ) ;
continue ;
}
2019-08-07 10:20:24 +08:00
for ( const auto & [ tag , version ] : tagVersions ) {
2019-07-30 01:37:42 +08:00
const auto & worker = self - > backupWorkers [ i % self - > backupWorkers . size ( ) ] ;
i + + ;
InitializeBackupRequest req ( deterministicRandom ( ) - > randomUniqueID ( ) ) ;
req . recruitedEpoch = epoch ;
2020-03-21 09:39:51 +08:00
req . backupEpoch = std : : get < 0 > ( epochVersionTags ) ;
2019-08-07 10:20:24 +08:00
req . routerTag = tag ;
2020-03-21 09:39:51 +08:00
req . totalTags = std : : get < 2 > ( epochVersionTags ) ;
2019-08-07 10:20:24 +08:00
req . startVersion = version ; // savedVersion + 1
2020-03-21 09:39:51 +08:00
req . endVersion = std : : get < 1 > ( epochVersionTags ) - 1 ;
2019-07-30 01:37:42 +08:00
TraceEvent ( " BackupRecruitment " , self - > dbgid )
2020-03-22 04:44:02 +08:00
. detail ( " RequestID " , req . reqId )
2020-01-04 02:22:41 +08:00
. detail ( " Tag " , req . routerTag . toString ( ) )
2019-07-30 01:37:42 +08:00
. detail ( " Epoch " , epoch )
2019-08-07 10:20:24 +08:00
. detail ( " BackupEpoch " , req . backupEpoch )
2019-12-17 05:50:52 +08:00
. detail ( " StartVersion " , req . startVersion )
. detail ( " EndVersion " , req . endVersion . get ( ) ) ;
2019-07-30 01:37:42 +08:00
initializationReplies . push_back ( transformErrors (
throwErrorOr ( worker . backup . getReplyUnlessFailedFor ( req , SERVER_KNOBS - > BACKUP_TIMEOUT ,
SERVER_KNOBS - > MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ) ,
2019-08-22 04:33:44 +08:00
master_backup_worker_failed ( ) ) ) ;
2019-07-30 01:37:42 +08:00
}
}
2020-01-17 11:16:23 +08:00
std : : vector < InitializeBackupReply > newRecruits = wait ( getAll ( initializationReplies ) ) ;
self - > logSystem - > setBackupWorkers ( newRecruits ) ;
2019-07-24 02:45:04 +08:00
TraceEvent ( " BackupRecruitmentDone " , self - > dbgid ) ;
2019-08-13 04:15:15 +08:00
self - > registrationTrigger . trigger ( ) ;
2019-07-24 02:45:04 +08:00
return Void ( ) ;
}
2018-01-06 03:33:42 +08:00
ACTOR Future < Void > masterCore ( Reference < MasterData > self ) {
2017-05-26 04:48:44 +08:00
state TraceInterval recoveryInterval ( " MasterRecovery " ) ;
2017-11-16 04:38:26 +08:00
state double recoverStartTime = now ( ) ;
2017-05-26 04:48:44 +08:00
2017-09-08 06:32:08 +08:00
self - > addActor . send ( waitFailureServer ( self - > myInterface . waitFailure . getFuture ( ) ) ) ;
2017-05-26 04:48:44 +08:00
TraceEvent ( recoveryInterval . begin ( ) , self - > dbgid ) ;
self - > recoveryState = RecoveryState : : READING_CSTATE ;
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
. detail ( " StatusCode " , RecoveryStatus : : reading_coordinated_state )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : reading_coordinated_state ] )
2017-10-25 07:28:50 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( self - > cstate . read ( ) ) ;
2017-05-26 04:48:44 +08:00
self - > recoveryState = RecoveryState : : LOCKING_CSTATE ;
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
. detail ( " StatusCode " , RecoveryStatus : : locking_coordinated_state )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : locking_coordinated_state ] )
2017-09-08 06:32:08 +08:00
. detail ( " TLogs " , self - > cstate . prevDBState . tLogs . size ( ) )
2020-03-17 02:09:42 +08:00
. detail ( " ActiveGenerations " , self - > cstate . myDBState . oldTLogData . size ( ) + 1 )
2017-09-08 06:32:08 +08:00
. detail ( " MyRecoveryCount " , self - > cstate . prevDBState . recoveryCount + 2 )
2019-02-19 06:54:28 +08:00
. detail ( " ForceRecovery " , self - > forceRecovery )
2017-10-25 07:28:50 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2019-08-11 01:31:25 +08:00
//for (const auto& old : self->cstate.prevDBState.oldTLogData) {
// TraceEvent("BWReadCoreState", self->dbgid).detail("Epoch", old.epoch).detail("Version", old.epochEnd);
//}
2017-05-26 04:48:44 +08:00
2020-03-17 01:29:17 +08:00
TraceEvent ( " MasterRecoveryGenerations " , self - > dbgid )
2020-03-17 02:09:42 +08:00
. detail ( " ActiveGenerations " , self - > cstate . myDBState . oldTLogData . size ( ) + 1 )
2020-03-17 01:29:17 +08:00
. trackLatest ( " MasterRecoveryGenerations " ) ;
2020-03-14 01:28:32 +08:00
if ( self - > cstate . myDBState . oldTLogData . size ( ) > CLIENT_KNOBS - > MAX_GENERATIONS_OVERRIDE ) {
if ( self - > cstate . myDBState . oldTLogData . size ( ) > = CLIENT_KNOBS - > MAX_GENERATIONS ) {
TraceEvent ( SevError , " RecoveryStoppedTooManyOldGenerations " ) . detail ( " OldGenerations " , self - > cstate . myDBState . oldTLogData . size ( ) )
2020-03-14 06:54:13 +08:00
. detail ( " Reason " , " Recovery stopped because too many recoveries have happened since the last time the cluster was fully_recovered. Set --knob_max_generations_override on your server processes to a value larger than OldGenerations to resume recovery once the underlying problem has been fixed. " ) ;
2020-03-14 01:28:32 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
} else if ( self - > cstate . myDBState . oldTLogData . size ( ) > CLIENT_KNOBS - > RECOVERY_DELAY_START_GENERATION ) {
TraceEvent ( SevError , " RecoveryDelayedTooManyOldGenerations " ) . detail ( " OldGenerations " , self - > cstate . myDBState . oldTLogData . size ( ) )
2020-03-15 06:04:46 +08:00
. detail ( " Reason " , " Recovery is delayed because too many recoveries have happened since the last time the cluster was fully_recovered. Set --knob_max_generations_override on your server processes to a value larger than OldGenerations to resume recovery once the underlying problem has been fixed. " ) ;
2020-03-14 01:28:32 +08:00
wait ( delay ( CLIENT_KNOBS - > RECOVERY_DELAY_SECONDS_PER_GENERATION * ( self - > cstate . myDBState . oldTLogData . size ( ) - CLIENT_KNOBS - > RECOVERY_DELAY_START_GENERATION ) ) ) ;
}
}
2017-05-26 04:48:44 +08:00
state Reference < AsyncVar < Reference < ILogSystem > > > oldLogSystems ( new AsyncVar < Reference < ILogSystem > > ) ;
2019-02-19 07:13:18 +08:00
state Future < Void > recoverAndEndEpoch = ILogSystem : : recoverAndEndEpoch ( oldLogSystems , self - > dbgid , self - > cstate . prevDBState , self - > myInterface . tlogRejoin . getFuture ( ) , self - > myInterface . locality , & self - > forceRecovery ) ;
2017-05-26 04:48:44 +08:00
2017-09-08 06:32:08 +08:00
DBCoreState newState = self - > cstate . myDBState ;
newState . recoveryCount + + ;
2018-08-11 04:57:10 +08:00
wait ( self - > cstate . write ( newState ) | | recoverAndEndEpoch ) ;
2017-05-26 04:48:44 +08:00
self - > recoveryState = RecoveryState : : RECRUITING ;
state vector < StorageServerInterface > seedServers ;
state vector < Standalone < CommitTransactionRef > > initialConfChanges ;
state Future < Void > logChanges ;
2019-07-13 04:10:21 +08:00
state Future < Void > minRecoveryDuration ;
2019-08-06 08:01:48 +08:00
state Future < Version > poppedTxsVersion ;
2017-05-26 04:48:44 +08:00
loop {
Reference < ILogSystem > oldLogSystem = oldLogSystems - > get ( ) ;
2019-07-13 04:10:21 +08:00
if ( oldLogSystem ) {
logChanges = triggerUpdates ( self , oldLogSystem ) ;
if ( ! minRecoveryDuration . isValid ( ) ) {
2019-07-13 08:58:16 +08:00
minRecoveryDuration = delay ( SERVER_KNOBS - > ENFORCED_MIN_RECOVERY_DURATION ) ;
2019-08-06 08:01:48 +08:00
poppedTxsVersion = oldLogSystem - > getTxsPoppedVersion ( ) ;
2019-07-13 04:10:21 +08:00
}
}
2017-05-26 04:48:44 +08:00
state Future < Void > reg = oldLogSystem ? updateRegistration ( self , oldLogSystem ) : Never ( ) ;
self - > registrationTrigger . trigger ( ) ;
choose {
2019-08-06 08:01:48 +08:00
when ( wait ( oldLogSystem ? recoverFrom ( self , oldLogSystem , & seedServers , & initialConfChanges , poppedTxsVersion ) : Never ( ) ) ) { reg . cancel ( ) ; break ; }
2018-08-11 04:57:10 +08:00
when ( wait ( oldLogSystems - > onChange ( ) ) ) { }
when ( wait ( reg ) ) { throw internal_error ( ) ; }
2020-03-24 03:40:00 +08:00
when ( wait ( recoverAndEndEpoch ) ) { throw internal_error ( ) ; }
2017-05-26 04:48:44 +08:00
}
}
2018-11-05 15:07:56 +08:00
if ( self - > neverCreated ) {
recoverStartTime = now ( ) ;
}
2017-05-26 04:48:44 +08:00
recoverAndEndEpoch . cancel ( ) ;
2020-09-11 08:44:15 +08:00
ASSERT ( self - > commitProxies . size ( ) < = self - > configuration . getDesiredCommitProxies ( ) ) ;
ASSERT ( self - > commitProxies . size ( ) > = 1 ) ;
2020-08-06 15:01:57 +08:00
ASSERT ( self - > grvProxies . size ( ) < = self - > configuration . getDesiredGrvProxies ( ) ) ;
ASSERT ( self - > grvProxies . size ( ) > = 1 ) ;
2017-05-26 04:48:44 +08:00
ASSERT ( self - > resolvers . size ( ) < = self - > configuration . getDesiredResolvers ( ) ) ;
2020-07-23 03:20:22 +08:00
ASSERT ( self - > resolvers . size ( ) > = 1 ) ;
2017-05-26 04:48:44 +08:00
self - > recoveryState = RecoveryState : : RECOVERY_TRANSACTION ;
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
. detail ( " StatusCode " , RecoveryStatus : : recovery_transaction )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : recovery_transaction ] )
2019-02-22 08:52:27 +08:00
. detail ( " PrimaryLocality " , self - > primaryLocality )
2019-03-19 06:03:43 +08:00
. detail ( " DcId " , self - > myInterface . locality . dcId ( ) )
2017-10-25 07:28:50 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2017-05-26 04:48:44 +08:00
// Recovery transaction
state bool debugResult = debug_checkMinRestoredVersion ( UID ( ) , self - > lastEpochEnd , " DBRecovery " , SevWarn ) ;
CommitTransactionRequest recoveryCommitRequest ;
2018-02-10 10:21:29 +08:00
recoveryCommitRequest . flags = recoveryCommitRequest . flags | CommitTransactionRequest : : FLAG_IS_LOCK_AWARE ;
2017-05-26 04:48:44 +08:00
CommitTransactionRef & tr = recoveryCommitRequest . transaction ;
int mmApplied = 0 ; // The number of mutations in tr.mutations that have been applied to the txnStateStore so far
if ( self - > lastEpochEnd ! = 0 ) {
2020-09-03 03:44:55 +08:00
Optional < Value > snapRecoveryFlag = self - > txnStateStore - > readValue ( writeRecoveryKey ) . get ( ) ;
2020-11-06 03:23:08 +08:00
TraceEvent ( " MasterRecoverySnapshotCheck " )
2020-10-01 02:24:52 +08:00
. detail ( " SnapRecoveryFlag " , snapRecoveryFlag . present ( ) ? snapRecoveryFlag . get ( ) . toString ( ) : " N/A " )
. detail ( " LastEpochEnd " , self - > lastEpochEnd ) ;
2020-09-03 03:44:55 +08:00
if ( snapRecoveryFlag . present ( ) ) {
2020-09-29 04:40:38 +08:00
TEST ( true ) ; // Recovering from snapshot, writing to snapShotEndVersionKey
2020-09-03 03:44:55 +08:00
BinaryWriter bw ( Unversioned ( ) ) ;
tr . set ( recoveryCommitRequest . arena , snapshotEndVersionKey , ( bw < < self - > lastEpochEnd ) . toValue ( ) ) ;
2020-11-03 05:58:08 +08:00
// Pause the backups that got restored in this snapshot to avoid data corruption
// Requires further operational work to abort the backup
2020-11-06 03:23:08 +08:00
TraceEvent ( " MasterRecoveryPauseBackupAgents " ) ;
2020-11-03 05:58:08 +08:00
Key backupPauseKey =
Subspace ( fileBackupPrefixRange . begin ) . get ( BackupAgentBase : : keyTasks ) . pack ( LiteralStringRef ( " pause " ) ) ;
tr . set ( recoveryCommitRequest . arena , backupPauseKey , StringRef ( ) ) ;
2020-09-03 03:44:55 +08:00
// Clear the key so multiple recoveries will not overwrite the first version recorded
tr . clear ( recoveryCommitRequest . arena , singleKeyRange ( writeRecoveryKey ) ) ;
}
2019-02-19 06:30:51 +08:00
if ( self - > forceRecovery ) {
BinaryWriter bw ( Unversioned ( ) ) ;
2019-03-29 02:52:50 +08:00
tr . set ( recoveryCommitRequest . arena , killStorageKey , ( bw < < self - > safeLocality ) . toValue ( ) ) ;
2019-02-19 06:30:51 +08:00
}
2017-05-26 04:48:44 +08:00
// This transaction sets \xff/lastEpochEnd, which the shard servers can use to roll back speculatively
// processed semi-committed transactions from the previous epoch.
// It also guarantees the shard servers and tlog servers eventually get versions in the new epoch, which
// clients might rely on.
// This transaction is by itself in a batch (has its own version number), which simplifies storage servers slightly (they assume there are no modifications to serverKeys in the same batch)
// The proxy also expects the lastEpochEndKey mutation to be first in the transaction
BinaryWriter bw ( Unversioned ( ) ) ;
2019-03-29 02:52:50 +08:00
tr . set ( recoveryCommitRequest . arena , lastEpochEndKey , ( bw < < self - > lastEpochEnd ) . toValue ( ) ) ;
2019-02-19 07:27:18 +08:00
if ( self - > forceRecovery ) {
tr . set ( recoveryCommitRequest . arena , rebootWhenDurableKey , StringRef ( ) ) ;
2019-05-11 05:01:52 +08:00
tr . set ( recoveryCommitRequest . arena , moveKeysLockOwnerKey , BinaryWriter : : toValue ( deterministicRandom ( ) - > randomUniqueID ( ) , Unversioned ( ) ) ) ;
2019-02-19 07:27:18 +08:00
}
2017-05-26 04:48:44 +08:00
} else {
// Recruit and seed initial shard servers
// This transaction must be the very first one in the database (version 1)
seedShardServers ( recoveryCommitRequest . arena , tr , seedServers ) ;
}
// initialConfChanges have not been conflict checked against any earlier writes in the recovery transaction, so do this as early as possible in the recovery transaction
// but see above comments as to why it can't be absolutely first. Theoretically emergency transactions should conflict check against the lastEpochEndKey.
for ( auto & itr : initialConfChanges ) {
tr . mutations . append_deep ( recoveryCommitRequest . arena , itr . mutations . begin ( ) , itr . mutations . size ( ) ) ;
tr . write_conflict_ranges . append_deep ( recoveryCommitRequest . arena , itr . write_conflict_ranges . begin ( ) , itr . write_conflict_ranges . size ( ) ) ;
}
2019-02-19 06:40:30 +08:00
tr . set ( recoveryCommitRequest . arena , primaryLocalityKey , BinaryWriter : : toValue ( self - > primaryLocality , Unversioned ( ) ) ) ;
2017-05-26 04:48:44 +08:00
tr . set ( recoveryCommitRequest . arena , backupVersionKey , backupVersionValue ) ;
tr . set ( recoveryCommitRequest . arena , coordinatorsKey , self - > coordinators . ccf - > getConnectionString ( ) . toString ( ) ) ;
tr . set ( recoveryCommitRequest . arena , logsKey , self - > logSystem - > getLogsValue ( ) ) ;
2018-06-15 10:36:02 +08:00
tr . set ( recoveryCommitRequest . arena , primaryDatacenterKey , self - > myInterface . locality . dcId ( ) . present ( ) ? self - > myInterface . locality . dcId ( ) . get ( ) : StringRef ( ) ) ;
2017-05-26 04:48:44 +08:00
2018-11-12 04:37:53 +08:00
tr . clear ( recoveryCommitRequest . arena , tLogDatacentersKeys ) ;
for ( auto & dc : self - > primaryDcId ) {
tr . set ( recoveryCommitRequest . arena , tLogDatacentersKeyFor ( dc ) , StringRef ( ) ) ;
}
2018-11-14 04:36:04 +08:00
if ( self - > configuration . usableRegions > 1 ) {
for ( auto & dc : self - > remoteDcIds ) {
tr . set ( recoveryCommitRequest . arena , tLogDatacentersKeyFor ( dc ) , StringRef ( ) ) ;
}
2018-11-12 04:37:53 +08:00
}
2020-08-28 07:16:05 +08:00
applyMetadataMutations ( SpanID ( ) , self - > dbgid , recoveryCommitRequest . arena , tr . mutations . slice ( mmApplied , tr . mutations . size ( ) ) ,
2020-08-06 03:20:52 +08:00
self - > txnStateStore ) ;
2017-05-26 04:48:44 +08:00
mmApplied = tr . mutations . size ( ) ;
tr . read_snapshot = self - > recoveryTransactionVersion ; // lastEpochEnd would make more sense, but isn't in the initial window of the resolver(s)
TraceEvent ( " MasterRecoveryCommit " , self - > dbgid ) ;
2020-09-11 08:44:15 +08:00
state Future < ErrorOr < CommitID > > recoveryCommit = self - > commitProxies [ 0 ] . commit . tryGetReply ( recoveryCommitRequest ) ;
2018-04-11 04:31:24 +08:00
self - > addActor . send ( self - > logSystem - > onError ( ) ) ;
self - > addActor . send ( waitResolverFailure ( self - > resolvers ) ) ;
2020-09-16 13:29:49 +08:00
self - > addActor . send ( waitCommitProxyFailure ( self - > commitProxies ) ) ;
2020-07-15 15:37:41 +08:00
self - > addActor . send ( waitGrvProxyFailure ( self - > grvProxies ) ) ;
2018-04-11 04:31:24 +08:00
self - > addActor . send ( provideVersions ( self ) ) ;
2020-06-10 02:09:46 +08:00
self - > addActor . send ( serveLiveCommittedVersion ( self ) ) ;
2018-06-09 02:11:08 +08:00
self - > addActor . send ( reportErrors ( updateRegistration ( self , self - > logSystem ) , " UpdateRegistration " , self - > dbgid ) ) ;
2017-05-26 04:48:44 +08:00
self - > registrationTrigger . trigger ( ) ;
2018-08-11 04:57:10 +08:00
wait ( discardCommit ( self - > txnStateStore , self - > txnStateLogAdapter ) ) ;
2017-05-26 04:48:44 +08:00
// Wait for the recovery transaction to complete.
// SOMEDAY: For faster recovery, do this and setDBState asynchronously and don't wait for them
// unless we want to change TLogs
2018-08-11 04:57:10 +08:00
wait ( ( success ( recoveryCommit ) & & sendInitialCommitToResolvers ( self ) ) ) ;
2019-07-28 07:46:22 +08:00
if ( recoveryCommit . isReady ( ) & & recoveryCommit . get ( ) . isError ( ) ) {
2017-05-26 04:48:44 +08:00
TEST ( true ) ; // Master recovery failed because of the initial commit failed
throw master_recovery_failed ( ) ;
}
ASSERT ( self - > recoveryTransactionVersion ! = 0 ) ;
self - > recoveryState = RecoveryState : : WRITING_CSTATE ;
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
. detail ( " StatusCode " , RecoveryStatus : : writing_coordinated_state )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : writing_coordinated_state ] )
. detail ( " TLogList " , self - > logSystem - > describe ( ) )
2017-10-25 07:28:50 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2017-05-26 04:48:44 +08:00
// Multiple masters prevent conflicts between themselves via CoordinatedState (self->cstate)
// 1. If SetMaster succeeds, then by CS's contract, these "new" Tlogs are the immediate
// successors of the "old" ones we are replacing
// 2. logSystem->recoverAndEndEpoch ensured that a co-quorum of the "old" tLogs were stopped at
// versions <= self->lastEpochEnd, so no versions > self->lastEpochEnd could be (fully) committed to them.
// 3. No other master will attempt to commit anything to our "new" Tlogs
// because it didn't recruit them
// 4. Therefore, no full commit can come between self->lastEpochEnd and the first commit
// we made to the new Tlogs (self->recoveryTransactionVersion), and only our own semi-commits can come between our
// first commit and the next new TLogs
2019-07-13 04:10:21 +08:00
self - > addActor . send ( trackTlogRecovery ( self , oldLogSystems , minRecoveryDuration ) ) ;
2017-05-26 04:48:44 +08:00
debug_advanceMaxCommittedVersion ( UID ( ) , self - > recoveryTransactionVersion ) ;
2018-08-11 04:57:10 +08:00
wait ( self - > cstateUpdated . getFuture ( ) ) ;
2017-05-26 04:48:44 +08:00
debug_advanceMinCommittedVersion ( UID ( ) , self - > recoveryTransactionVersion ) ;
2018-09-18 09:32:39 +08:00
if ( debugResult ) {
TraceEvent ( self - > forceRecovery ? SevWarn : SevError , " DBRecoveryDurabilityError " ) ;
}
2017-05-26 04:48:44 +08:00
2017-09-08 06:32:08 +08:00
TraceEvent ( " MasterCommittedTLogs " , self - > dbgid ) . detail ( " TLogs " , self - > logSystem - > describe ( ) ) . detail ( " RecoveryCount " , self - > cstate . myDBState . recoveryCount ) . detail ( " RecoveryTransactionVersion " , self - > recoveryTransactionVersion ) ;
2017-05-26 04:48:44 +08:00
TraceEvent ( recoveryInterval . end ( ) , self - > dbgid ) . detail ( " RecoveryTransactionVersion " , self - > recoveryTransactionVersion ) ;
2018-07-05 15:08:51 +08:00
self - > recoveryState = RecoveryState : : ACCEPTING_COMMITS ;
2017-11-16 04:38:26 +08:00
double recoveryDuration = now ( ) - recoverStartTime ;
2018-01-08 07:37:45 +08:00
TraceEvent ( ( recoveryDuration > 4 & & ! g_network - > isSimulated ( ) ) ? SevWarnAlways : SevInfo , " MasterRecoveryDuration " , self - > dbgid )
2018-06-09 02:11:08 +08:00
. detail ( " RecoveryDuration " , recoveryDuration )
2017-11-16 04:38:26 +08:00
. trackLatest ( " MasterRecoveryDuration " ) ;
2017-05-26 04:48:44 +08:00
TraceEvent ( " MasterRecoveryState " , self - > dbgid )
2018-07-05 15:08:51 +08:00
. detail ( " StatusCode " , RecoveryStatus : : accepting_commits )
. detail ( " Status " , RecoveryStatus : : names [ RecoveryStatus : : accepting_commits ] )
2018-06-09 02:11:08 +08:00
. detail ( " StoreType " , self - > configuration . storageServerStoreType )
. detail ( " RecoveryDuration " , recoveryDuration )
2017-10-25 07:28:50 +08:00
. trackLatest ( " MasterRecoveryState " ) ;
2017-05-26 04:48:44 +08:00
2020-09-24 01:54:49 +08:00
TraceEvent ( " MasterRecoveryAvailable " , self - > dbgid )
. detail ( " AvailableAtVersion " , self - > version )
. trackLatest ( " MasterRecoveryAvailable " ) ;
2020-09-18 00:55:25 +08:00
2017-05-26 04:48:44 +08:00
if ( self - > resolvers . size ( ) > 1 )
2017-09-08 06:32:08 +08:00
self - > addActor . send ( resolutionBalancing ( self ) ) ;
2017-05-26 04:48:44 +08:00
2017-09-08 06:32:08 +08:00
self - > addActor . send ( changeCoordinators ( self ) ) ;
2020-01-17 13:21:25 +08:00
Database cx = openDBOnServer ( self - > dbInfo , TaskPriority : : DefaultEndpoint , true , true ) ;
self - > addActor . send ( configurationMonitor ( self , cx ) ) ;
2020-02-05 02:09:16 +08:00
if ( self - > configuration . backupWorkerEnabled ) {
2020-01-28 05:14:52 +08:00
self - > addActor . send ( recruitBackupWorkers ( self , cx ) ) ;
2020-03-21 04:58:20 +08:00
} else {
self - > logSystem - > setOldestBackupEpoch ( self - > cstate . myDBState . recoveryCount ) ;
2020-01-28 05:14:52 +08:00
}
2017-05-26 04:48:44 +08:00
2018-08-11 04:57:10 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
2018-04-11 04:31:24 +08:00
throw internal_error ( ) ;
2017-05-26 04:48:44 +08:00
}
2020-07-18 05:59:38 +08:00
ACTOR Future < Void > masterServer ( MasterInterface mi , Reference < AsyncVar < ServerDBInfo > > db , Reference < AsyncVar < Optional < ClusterControllerFullInterface > > > ccInterface , ServerCoordinators coordinators , LifetimeToken lifetime , bool forceRecovery )
2017-05-26 04:48:44 +08:00
{
2020-07-21 02:29:37 +08:00
state Future < Void > ccTimeout = delay ( SERVER_KNOBS - > CC_INTERFACE_TIMEOUT ) ;
while ( ! ccInterface - > get ( ) . present ( ) | | db - > get ( ) . clusterInterface ! = ccInterface - > get ( ) . get ( ) ) {
wait ( ccInterface - > onChange ( ) | | db - > onChange ( ) | | ccTimeout ) ;
if ( ccTimeout . isReady ( ) ) {
TraceEvent ( " MasterTerminated " , mi . id ( ) ) . detail ( " Reason " , " Timeout " )
2020-07-21 02:35:20 +08:00
. detail ( " CCInterface " , ccInterface - > get ( ) . present ( ) ? ccInterface - > get ( ) . get ( ) . id ( ) : UID ( ) )
2020-07-21 02:29:37 +08:00
. detail ( " DBInfoInterface " , db - > get ( ) . clusterInterface . id ( ) ) ;
return Void ( ) ;
}
}
2017-05-26 04:48:44 +08:00
state Future < Void > onDBChange = Void ( ) ;
2017-09-08 06:32:08 +08:00
state PromiseStream < Future < Void > > addActor ;
2018-08-17 01:24:12 +08:00
state Reference < MasterData > self ( new MasterData ( db , mi , coordinators , db - > get ( ) . clusterInterface , LiteralStringRef ( " " ) , addActor , forceRecovery ) ) ;
2017-09-08 06:32:08 +08:00
state Future < Void > collection = actorCollection ( self - > addActor . getFuture ( ) ) ;
2020-05-09 07:27:57 +08:00
self - > addActor . send ( traceRole ( Role : : MASTER , mi . id ( ) ) ) ;
2017-05-26 04:48:44 +08:00
TEST ( ! lifetime . isStillValid ( db - > get ( ) . masterLifetime , mi . id ( ) = = db - > get ( ) . master . id ( ) ) ) ; // Master born doomed
TraceEvent ( " MasterLifetime " , self - > dbgid ) . detail ( " LifetimeToken " , lifetime . toString ( ) ) ;
try {
2017-09-08 06:32:08 +08:00
state Future < Void > core = masterCore ( self ) ;
2017-05-26 04:48:44 +08:00
loop choose {
2018-08-11 04:57:10 +08:00
when ( wait ( core ) ) { break ; }
when ( wait ( onDBChange ) ) {
2017-05-26 04:48:44 +08:00
onDBChange = db - > onChange ( ) ;
if ( ! lifetime . isStillValid ( db - > get ( ) . masterLifetime , mi . id ( ) = = db - > get ( ) . master . id ( ) ) ) {
TraceEvent ( " MasterTerminated " , mi . id ( ) ) . detail ( " Reason " , " LifetimeToken " ) . detail ( " MyToken " , lifetime . toString ( ) ) . detail ( " CurrentToken " , db - > get ( ) . masterLifetime . toString ( ) ) ;
TEST ( true ) ; // Master replaced, dying
2018-08-11 04:57:10 +08:00
if ( BUGGIFY ) wait ( delay ( 5 ) ) ;
2017-05-26 04:48:44 +08:00
throw worker_removed ( ) ;
}
}
2019-08-15 08:00:20 +08:00
when ( BackupWorkerDoneRequest req = waitNext ( mi . notifyBackupWorkerDone . getFuture ( ) ) ) {
2019-08-15 13:07:25 +08:00
if ( self - > logSystem . isValid ( ) & & self - > logSystem - > removeBackupWorker ( req ) ) {
2019-08-15 08:00:20 +08:00
self - > registrationTrigger . trigger ( ) ;
}
2020-01-17 13:21:25 +08:00
req . reply . send ( Void ( ) ) ;
2019-08-15 08:00:20 +08:00
}
2018-08-11 04:57:10 +08:00
when ( wait ( collection ) ) { ASSERT ( false ) ; throw internal_error ( ) ; }
2017-05-26 04:48:44 +08:00
}
} catch ( Error & e ) {
2018-04-25 07:10:14 +08:00
state Error err = e ;
if ( e . code ( ) ! = error_code_actor_cancelled ) {
2018-08-11 04:57:10 +08:00
wait ( delay ( 0.0 ) ) ;
2018-04-25 07:10:14 +08:00
}
while ( ! self - > addActor . isEmpty ( ) ) {
self - > addActor . getFuture ( ) . pop ( ) ;
}
2020-01-25 03:03:25 +08:00
2018-04-25 07:10:14 +08:00
TEST ( err . code ( ) = = error_code_master_tlog_failed ) ; // Master: terminated because of a tLog failure
2020-09-11 08:44:15 +08:00
TEST ( err . code ( ) = = error_code_commit_proxy_failed ) ; // Master: terminated because of a commit proxy failure
2020-07-15 15:37:41 +08:00
TEST ( err . code ( ) = = error_code_grv_proxy_failed ) ; // Master: terminated because of a GRV proxy failure
2018-04-25 07:10:14 +08:00
TEST ( err . code ( ) = = error_code_master_resolver_failed ) ; // Master: terminated because of a resolver failure
2019-05-21 05:22:31 +08:00
TEST ( err . code ( ) = = error_code_master_backup_worker_failed ) ; // Master: terminated because of a backup worker failure
2017-05-26 04:48:44 +08:00
2019-05-21 05:22:31 +08:00
if ( normalMasterErrors ( ) . count ( err . code ( ) ) ) {
2018-04-25 07:10:14 +08:00
TraceEvent ( " MasterTerminated " , mi . id ( ) ) . error ( err ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
2018-04-25 07:10:14 +08:00
throw err ;
2017-05-26 04:48:44 +08:00
}
return Void ( ) ;
}