fix: the master did not detect core state changes if it changed while writing
fix: do not attempt to use three_data_hall when in a fearless deployment fix: log router tags are ephemeral and can be cleared after every recovery
This commit is contained in:
parent
1b5628d2c5
commit
9ea963ddd6
|
@ -1004,7 +1004,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, uint16_t minTag, int8_t remoteLocality )
|
||||
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality )
|
||||
{
|
||||
TraceEvent("RemoteLogRecruitment_WaitingForWorkers");
|
||||
state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers );
|
||||
|
@ -1016,15 +1016,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->hasBestPolicy = HasBestPolicyId;
|
||||
logSet->locality = remoteLocality;
|
||||
|
||||
//recruit temporary log routers and update registration with them
|
||||
state int tempLogRouters = std::max<int>(remoteWorkers.logRouters.size(), minTag + 1);
|
||||
state vector<Future<TLogInterface>> logRouterInitializationReplies;
|
||||
for( int i = 0; i < tempLogRouters; i++) {
|
||||
for( int i = 0; i < remoteWorkers.logRouters.size(); i++) {
|
||||
InitializeLogRouterRequest req;
|
||||
req.recoveryCount = recoveryCount;
|
||||
req.routerTag = Tag(tagLocalityLogRouter, i);
|
||||
req.logSet = self->tLogs.size();
|
||||
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i%remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
|
||||
}
|
||||
|
||||
TraceEvent("RemoteLogRecruitment_RecruitingLogRouters");
|
||||
|
@ -1150,13 +1148,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
filterLocalityDataForPolicy(logSystem->tLogs[0]->tLogPolicy, &logSystem->tLogs[0]->tLogLocalities);
|
||||
|
||||
std::vector<int> locations;
|
||||
state uint16_t minTag = 0;
|
||||
state std::vector<Tag> oldRouterTags;
|
||||
for( Tag tag : oldLogSystem->getEpochEndTags() ) {
|
||||
minTag = std::min(minTag, tag.id);
|
||||
locations.clear();
|
||||
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
|
||||
for(int loc : locations)
|
||||
reqs[ loc ].recoverTags.push_back( tag );
|
||||
if(tag.locality == tagLocalityLogRouter) {
|
||||
oldRouterTags.push_back(tag);
|
||||
} else {
|
||||
locations.clear();
|
||||
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
|
||||
for(int loc : locations)
|
||||
reqs[ loc ].recoverTags.push_back( tag );
|
||||
}
|
||||
}
|
||||
|
||||
for( int i = 0; i < recr.tLogs.size(); i++ )
|
||||
|
@ -1184,11 +1185,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
filterLocalityDataForPolicy(logSystem->tLogs[1]->tLogPolicy, &logSystem->tLogs[1]->tLogLocalities);
|
||||
|
||||
for( Tag tag : oldLogSystem->getEpochEndTags() ) {
|
||||
minTag = std::min(minTag, tag.id);
|
||||
locations.clear();
|
||||
logSystem->tLogs[1]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
|
||||
for(int loc : locations)
|
||||
sreqs[ loc ].recoverTags.push_back( tag );
|
||||
if(tag.locality != tagLocalityLogRouter) {
|
||||
locations.clear();
|
||||
logSystem->tLogs[1]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
|
||||
for(int loc : locations)
|
||||
sreqs[ loc ].recoverTags.push_back( tag );
|
||||
}
|
||||
}
|
||||
|
||||
for( int i = 0; i < recr.satelliteTLogs.size(); i++ )
|
||||
|
@ -1222,13 +1224,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
if(configuration.remoteTLogReplicationFactor > 0) {
|
||||
logSystem->hasRemoteServers = true;
|
||||
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, minTag, remoteLocality);
|
||||
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, remoteLocality);
|
||||
} else {
|
||||
logSystem->hasRemoteServers = false;
|
||||
logSystem->remoteRecovery = logSystem->recoveryComplete;
|
||||
logSystem->remoteRecoveryComplete = logSystem->recoveryComplete;
|
||||
}
|
||||
|
||||
for(auto tag : oldRouterTags) {
|
||||
logSystem->pop(oldLogSystem->epochEndVersion.get() + 1, tag);
|
||||
}
|
||||
|
||||
return logSystem;
|
||||
}
|
||||
|
||||
|
|
|
@ -1011,7 +1011,7 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
|
|||
state DBCoreState newState;
|
||||
self->logSystem->toCoreState( newState );
|
||||
newState.recoveryCount = recoverCount;
|
||||
|
||||
state Future<Void> changed = self->logSystem->onCoreStateChanged();
|
||||
ASSERT( newState.tLogs[0].tLogWriteAntiQuorum == self->configuration.tLogWriteAntiQuorum && newState.tLogs[0].tLogReplicationFactor == self->configuration.tLogReplicationFactor );
|
||||
|
||||
state bool finalUpdate = !newState.oldTLogData.size() && newState.tLogs.size() == self->configuration.expectedLogSets();
|
||||
|
@ -1037,7 +1037,7 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
|
|||
return Void();
|
||||
}
|
||||
|
||||
Void _ = wait( self->logSystem->onCoreStateChanged() );
|
||||
Void _ = wait( changed );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -147,7 +147,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
|
|||
//TraceEvent("ConfigureTestConfigureBegin").detail("newConfig", newConfig);
|
||||
int redundancy = g_random->randomInt( 0, sizeof(redundancies)/sizeof(redundancies[0]));
|
||||
std::string config = redundancies[redundancy];
|
||||
if(config == "triple" && g_simulator.physicalDatacenters > 2) {
|
||||
if(config == "triple" && g_simulator.physicalDatacenters == 3) {
|
||||
config = "three_data_hall";
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue