fixed a large number of problems related to running without remote logs
This commit is contained in:
parent
316e200a0c
commit
8f58bdd1cd
|
@ -684,7 +684,7 @@ public:
|
|||
}
|
||||
|
||||
RecruitFromConfigurationReply findWorkersForConfiguration( RecruitFromConfigurationRequest const& req ) {
|
||||
if(req.configuration.remoteTLogReplicationFactor > 0) {
|
||||
if(req.configuration.primaryDcId.present()) {
|
||||
try {
|
||||
return findWorkersForConfiguration(req, req.configuration.primaryDcId);
|
||||
} catch( Error& e ) {
|
||||
|
|
|
@ -90,16 +90,15 @@ bool DatabaseConfiguration::isValid() const {
|
|||
tLogPolicy &&
|
||||
remoteDesiredTLogCount >= 0 &&
|
||||
remoteTLogReplicationFactor >= 0 &&
|
||||
remoteTLogPolicy &&
|
||||
( remoteTLogReplicationFactor == 0 || ( primaryDcId.present() && remoteDcId.present() && remoteDurableStorageQuorum >= 1 && logRouterCount >= 1 ) ) &&
|
||||
remoteStoragePolicy &&
|
||||
( remoteTLogReplicationFactor == 0 || ( remoteStoragePolicy && remoteTLogPolicy && primaryDcId.present() && remoteDcId.present() && remoteDurableStorageQuorum >= 1 && logRouterCount >= 1 ) ) &&
|
||||
primaryDcId.present() == remoteDcId.present() &&
|
||||
remoteDurableStorageQuorum <= remoteStorageTeamSize &&
|
||||
satelliteDesiredTLogCount >= 0 &&
|
||||
satelliteTLogReplicationFactor >= 0 &&
|
||||
satelliteTLogWriteAntiQuorum >= 0 &&
|
||||
satelliteTLogUsableDcs >= 0 &&
|
||||
( satelliteTLogReplicationFactor == 0 || ( primarySatelliteDcIds.size() && remoteSatelliteDcIds.size() && remoteTLogReplicationFactor > 0 ) ) &&
|
||||
satelliteTLogPolicy &&
|
||||
( satelliteTLogReplicationFactor == 0 || ( satelliteTLogPolicy && primarySatelliteDcIds.size() && remoteSatelliteDcIds.size() && remoteTLogReplicationFactor > 0 ) ) &&
|
||||
primarySatelliteDcIds.size() == remoteSatelliteDcIds.size() &&
|
||||
logRouterCount >= 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -623,7 +623,7 @@ ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer
|
|||
}
|
||||
bestSetValid = self->localityGroup.size() < self->logSets[self->bestSet]->tLogReplicationFactor || !self->localityGroup.validate(self->logSets[self->bestSet]->tLogPolicy);
|
||||
}
|
||||
if(bestSetValid) {
|
||||
if(bestSetValid || self->logSets.size() == 1) {
|
||||
if(!self->useBestSet) {
|
||||
self->useBestSet = true;
|
||||
self->calcHasMessage();
|
||||
|
|
|
@ -1174,6 +1174,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
if(configuration.remoteTLogReplicationFactor > 0) {
|
||||
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, minTag, remoteLocality);
|
||||
} else {
|
||||
logSystem->remoteRecovery = Void();
|
||||
logSystem->remoteRecoveryComplete = Void();
|
||||
}
|
||||
|
||||
return logSystem;
|
||||
|
|
|
@ -186,6 +186,14 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
int64_t memoryLimit;
|
||||
std::map<Optional<Value>,int8_t> dcId_locality;
|
||||
|
||||
int8_t getNextLocality() {
|
||||
int8_t maxLocality = -1;
|
||||
for(auto it : dcId_locality) {
|
||||
maxLocality = std::max(maxLocality, it.second);
|
||||
}
|
||||
return maxLocality + 1;
|
||||
}
|
||||
|
||||
vector< MasterProxyInterface > proxies;
|
||||
vector< MasterProxyInterface > provisionalProxies;
|
||||
vector< ResolverInterface > resolvers;
|
||||
|
@ -280,16 +288,28 @@ ACTOR Future<Void> newResolvers( Reference<MasterData> self, RecruitFromConfigur
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfigurationReply recr, Reference<ILogSystem> oldLogSystem ) {
|
||||
ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfigurationReply recr, Reference<ILogSystem> oldLogSystem, vector<Standalone<CommitTransactionRef>>* initialConfChanges ) {
|
||||
state Optional<Key> primaryDcId = recr.remoteDcId == self->configuration.remoteDcId ? self->configuration.primaryDcId : self->configuration.remoteDcId;
|
||||
if( !self->dcId_locality.count(primaryDcId) || (self->configuration.remoteTLogReplicationFactor > 0 && !self->dcId_locality.count(recr.remoteDcId)) ) {
|
||||
TraceEvent(SevWarnAlways, "UnknownDCID", self->dbgid).detail("primaryFound", self->dcId_locality.count(self->configuration.primaryDcId)).detail("remoteFound", self->dcId_locality.count(self->configuration.primaryDcId)).detail("primaryId", printable(self->configuration.primaryDcId)).detail("remoteId", printable(self->configuration.remoteDcId));
|
||||
Void _ = wait( Future<Void>(Never()) );
|
||||
if( !self->dcId_locality.count(primaryDcId) ) {
|
||||
TraceEvent("UnknownPrimaryDCID", self->dbgid).detail("found", self->dcId_locality.count(primaryDcId)).detail("primaryId", printable(primaryDcId));
|
||||
int8_t loc = self->getNextLocality();
|
||||
Standalone<CommitTransactionRef> tr;
|
||||
tr.set(tr.arena(), tagLocalityListKeyFor(primaryDcId), tagLocalityListValue(loc));
|
||||
initialConfChanges->push_back(tr);
|
||||
self->dcId_locality[primaryDcId] = loc;
|
||||
}
|
||||
|
||||
if( self->configuration.remoteTLogReplicationFactor > 0 && !self->dcId_locality.count(recr.remoteDcId) ) {
|
||||
TraceEvent("UnknownRemoteDCID", self->dbgid).detail("remoteFound", self->dcId_locality.count(recr.remoteDcId)).detail("remoteId", printable(recr.remoteDcId));
|
||||
int8_t loc = self->getNextLocality();
|
||||
Standalone<CommitTransactionRef> tr;
|
||||
tr.set(tr.arena(), tagLocalityListKeyFor(recr.remoteDcId), tagLocalityListValue(loc));
|
||||
initialConfChanges->push_back(tr);
|
||||
self->dcId_locality[recr.remoteDcId] = loc;
|
||||
}
|
||||
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = self->configuration.remoteTLogReplicationFactor > 0 ? brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, recr.remoteDcId ) ) ) : Never();
|
||||
|
||||
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[primaryDcId], self->dcId_locality[recr.remoteDcId] ) );
|
||||
self->logSystem = newLogSystem;
|
||||
|
||||
|
@ -495,7 +515,7 @@ ACTOR Future<Standalone<CommitTransactionRef>> provisionalMaster( Reference<Mast
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<StorageServerInterface>* seedServers, Reference<ILogSystem> oldLogSystem ) {
|
||||
ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<StorageServerInterface>* seedServers, Reference<ILogSystem> oldLogSystem, vector<Standalone<CommitTransactionRef>>* initialConfChanges ) {
|
||||
if (!self->configuration.isValid()) {
|
||||
RecoveryStatus::RecoveryStatus status;
|
||||
if (self->configuration.initialized)
|
||||
|
@ -542,7 +562,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
|
|||
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand new database we are sort of lying that we are
|
||||
// past the recruitment phase. In a perfect world we would split that up so that the recruitment part happens above (in parallel with recruiting the transaction servers?).
|
||||
Void _ = wait( newSeedServers( self, recruits, seedServers ) );
|
||||
Void _ = wait( newProxies( self, recruits ) && newResolvers( self, recruits ) && newTLogServers( self, recruits, oldLogSystem ) );
|
||||
Void _ = wait( newProxies( self, recruits ) && newResolvers( self, recruits ) && newTLogServers( self, recruits, oldLogSystem, initialConfChanges ) );
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -696,7 +716,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
|
|||
// Ordinarily we pass through this loop once and recover. We go around the loop if recovery stalls for more than a second,
|
||||
// a provisional master is initialized, and an "emergency transaction" is submitted that might change the configuration so that we can
|
||||
// finish recovery.
|
||||
state Future<Void> recruitments = recruitEverything( self, seedServers, oldLogSystem );
|
||||
state Future<Void> recruitments = recruitEverything( self, seedServers, oldLogSystem, initialConfChanges );
|
||||
loop {
|
||||
state Future<Standalone<CommitTransactionRef>> provisional = provisionalMaster(self, delay(1.0));
|
||||
|
||||
|
@ -721,7 +741,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
|
|||
initialConfChanges->push_back(req);
|
||||
|
||||
if(self->configuration != oldConf) { //confChange does not trigger when including servers
|
||||
recruitments = recruitEverything( self, seedServers, oldLogSystem );
|
||||
recruitments = recruitEverything( self, seedServers, oldLogSystem, initialConfChanges );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue