fix: new tlogs are initialized with exactly the tags which existed at the recovery version
This commit is contained in:
parent
a520d03397
commit
73597f190e
|
@ -2042,41 +2042,6 @@ static std::set<int> const& normalDDQueueErrors() {
|
|||
return s;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> popOldTags( Database cx, Reference<ILogSystem> logSystem, Version recoveryCommitVersion ) {
|
||||
state Transaction tr(cx);
|
||||
|
||||
if( recoveryCommitVersion == 1 )
|
||||
return Void();
|
||||
|
||||
loop {
|
||||
try {
|
||||
state Future<Standalone<RangeResultRef>> fTags = tr.getRange( serverTagKeys, CLIENT_KNOBS->TOO_MANY );
|
||||
state Future<Standalone<RangeResultRef>> fHistoryTags = tr.getRange( serverTagHistoryKeys, CLIENT_KNOBS->TOO_MANY );
|
||||
|
||||
Void _ = wait( success(fTags) && success(fHistoryTags) );
|
||||
|
||||
state std::set<Tag> tags;
|
||||
|
||||
for(auto& kv : fTags.get()) {
|
||||
tags.insert(decodeServerTagValue( kv.value ));
|
||||
}
|
||||
|
||||
for(auto& kv : fHistoryTags.get()) {
|
||||
tags.insert(decodeServerTagValue( kv.value ));
|
||||
}
|
||||
|
||||
for(auto tag : logSystem->getEpochEndTags()) {
|
||||
if(!tags.count(tag)) {
|
||||
logSystem->pop(recoveryCommitVersion, tag);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
} catch( Error &e ) {
|
||||
Void _ = wait( tr.onError(e) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> pollMoveKeysLock( Database cx, MoveKeysLock lock ) {
|
||||
loop {
|
||||
Void _ = wait(delay(SERVER_KNOBS->MOVEKEYS_LOCK_POLLING_DELAY));
|
||||
|
@ -2219,7 +2184,6 @@ ACTOR Future<Void> dataDistribution(
|
|||
}
|
||||
|
||||
actors.push_back( pollMoveKeysLock(cx, lock) );
|
||||
actors.push_back( popOldTags( cx, logSystem, recoveryCommitVersion) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, configuration.durableStorageQuorum, lastLimited ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.remoteTLogReplicationFactor > 0 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
||||
|
|
|
@ -467,7 +467,7 @@ struct ILogSystem {
|
|||
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
|
||||
// Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable)
|
||||
|
||||
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality ) = 0;
|
||||
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags ) = 0;
|
||||
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
|
||||
// Returns an ILogSystem representing a new epoch immediately following this one. The new epoch is only provisional until the caller updates the coordinated DBCoreState
|
||||
|
||||
|
@ -488,9 +488,6 @@ struct ILogSystem {
|
|||
virtual Tag getRandomRouterTag() = 0;
|
||||
|
||||
virtual void stopRejoins() = 0;
|
||||
|
||||
virtual const std::set<Tag>& getEpochEndTags() = 0;
|
||||
// Returns the list of tags recovered from the previous generation of logs. This is not initialized on log systems created with fromServerDBInfo, fromLogSystemConfig, or fromOldLogSystemConfig.
|
||||
};
|
||||
|
||||
struct LengthPrefixedStringRef {
|
||||
|
|
|
@ -475,10 +475,8 @@ namespace oldTLog {
|
|||
TLogLockResult result;
|
||||
result.end = stopVersion;
|
||||
result.knownCommittedVersion = logData->knownCommittedVersion;
|
||||
for( auto & tag : logData->tag_data )
|
||||
result.tags.push_back( convertOldTag(tag.key) );
|
||||
|
||||
TraceEvent("TLogStop2", self->dbgid).detail("logId", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get()).detail("tags", describe(result.tags));
|
||||
TraceEvent("TLogStop2", self->dbgid).detail("logId", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get());
|
||||
|
||||
|
||||
reply.send( result );
|
||||
|
|
|
@ -82,11 +82,10 @@ struct TLogRecoveryFinishedRequest {
|
|||
struct TLogLockResult {
|
||||
Version end;
|
||||
Version knownCommittedVersion;
|
||||
std::vector<Tag> tags;
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & end & knownCommittedVersion & tags;
|
||||
ar & end & knownCommittedVersion;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -516,15 +516,7 @@ ACTOR Future<Void> tLogLock( TLogData* self, ReplyPromise< TLogLockResult > repl
|
|||
result.end = stopVersion;
|
||||
result.knownCommittedVersion = logData->knownCommittedVersion;
|
||||
|
||||
for(int tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(int tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
if(logData->tag_data[tag_locality][tag_id] && logData->tag_data[tag_locality][tag_id]->tag.locality != tagLocalityLogRouter && (!logData->tag_data[tag_locality][tag_id]->nothing_persistent || logData->tag_data[tag_locality][tag_id]->version_messages.size() || logData->tag_data[tag_locality][tag_id]->unpoppedRecovered)) {
|
||||
result.tags.push_back(logData->tag_data[tag_locality][tag_id]->tag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TLogStop2", self->dbgid).detail("logId", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get()).detail("tags", describe(result.tags));
|
||||
TraceEvent("TLogStop2", self->dbgid).detail("logId", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get());
|
||||
|
||||
reply.send( result );
|
||||
return Void();
|
||||
|
|
|
@ -79,7 +79,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
bool hasRemoteServers;
|
||||
|
||||
Optional<Version> epochEndVersion;
|
||||
std::set<Tag> epochEndTags;
|
||||
Version knownCommittedVersion;
|
||||
LocalityData locality;
|
||||
std::map< std::pair<UID, Tag>, std::pair<Version, Version> > outstandingPops; // For each currently running popFromLog actor, (log server #, tag)->popped version
|
||||
|
@ -793,10 +792,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return waitForAll(lockResults);
|
||||
}
|
||||
|
||||
virtual Future<Reference<ILogSystem>> newEpoch( RecruitFromConfigurationReply const& recr, Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality ) {
|
||||
virtual Future<Reference<ILogSystem>> newEpoch( RecruitFromConfigurationReply const& recr, Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags ) {
|
||||
// Call only after end_epoch() has successfully completed. Returns a new epoch immediately following this one. The new epoch
|
||||
// is only provisional until the caller updates the coordinated DBCoreState
|
||||
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality );
|
||||
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality, allTags );
|
||||
}
|
||||
|
||||
virtual LogSystemConfig getLogSystemConfig() {
|
||||
|
@ -940,10 +939,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return Tag(tagLocalityLogRouter, g_random->randomInt(0, logRouterTags));
|
||||
}
|
||||
|
||||
virtual const std::set<Tag>& getEpochEndTags() {
|
||||
return epochEndTags;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorLog(Reference<AsyncVar<OptionalInterface<TLogInterface>>> logServer, Reference<AsyncVar<bool>> failed) {
|
||||
state Future<Void> waitFailure;
|
||||
loop {
|
||||
|
@ -1321,16 +1316,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->knownCommittedVersion = knownCommittedVersion;
|
||||
logSystem->remoteLogsWrittenToCoreState = true;
|
||||
|
||||
for(int log = 0; log < logServers.size(); log++) {
|
||||
if(lockResults[log].logSet->isLocal) {
|
||||
for(auto& r : lockResults[log].replies) {
|
||||
if( r.isReady() && !r.isError() ) {
|
||||
logSystem->epochEndTags.insert( r.get().tags.begin(), r.get().tags.end() );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
outLogSystem->set(logSystem);
|
||||
}
|
||||
|
||||
|
@ -1476,8 +1461,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality )
|
||||
{
|
||||
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality, std::vector<Tag> allTags ) {
|
||||
TraceEvent("RemoteLogRecruitment_WaitingForWorkers");
|
||||
state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers );
|
||||
|
||||
|
@ -1531,7 +1515,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() );
|
||||
|
||||
std::vector<Tag> allTags(self->epochEndTags.begin(), self->epochEndTags.end());
|
||||
for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) {
|
||||
InitializeTLogRequest &req = remoteTLogReqs[i];
|
||||
req.recruitmentID = self->recruitmentID;
|
||||
|
@ -1574,13 +1557,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Reference<ILogSystem>> newEpoch( Reference<TagPartitionedLogSystem> oldLogSystem, RecruitFromConfigurationReply recr, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality )
|
||||
{
|
||||
ACTOR static Future<Reference<ILogSystem>> newEpoch( Reference<TagPartitionedLogSystem> oldLogSystem, RecruitFromConfigurationReply recr, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> allTags ) {
|
||||
state double startTime = now();
|
||||
state Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) );
|
||||
logSystem->logSystemType = 2;
|
||||
logSystem->expectedLogSets = 1;
|
||||
logSystem->epochEndTags = oldLogSystem->getEpochEndTags();
|
||||
logSystem->recruitmentID = g_random->randomUniqueID();
|
||||
oldLogSystem->recruitmentID = logSystem->recruitmentID;
|
||||
|
||||
|
@ -1648,7 +1629,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
state vector<Future<TLogInterface>> initializationReplies;
|
||||
vector< InitializeTLogRequest > reqs( recr.tLogs.size() );
|
||||
|
||||
std::vector<Tag> allTags(logSystem->epochEndTags.begin(), logSystem->epochEndTags.end());
|
||||
for( int i = 0; i < recr.tLogs.size(); i++ ) {
|
||||
InitializeTLogRequest &req = reqs[i];
|
||||
req.recruitmentID = logSystem->recruitmentID;
|
||||
|
@ -1671,7 +1651,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
filterLocalityDataForPolicy(logSystem->tLogs[0]->tLogPolicy, &logSystem->tLogs[0]->tLogLocalities);
|
||||
|
||||
std::vector<int> locations;
|
||||
for( Tag tag : oldLogSystem->getEpochEndTags() ) {
|
||||
for( Tag tag : allTags ) {
|
||||
locations.clear();
|
||||
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
|
||||
for(int loc : locations)
|
||||
|
@ -1708,7 +1688,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSystem->tLogs[1]->updateLocalitySet(recr.satelliteTLogs);
|
||||
filterLocalityDataForPolicy(logSystem->tLogs[1]->tLogPolicy, &logSystem->tLogs[1]->tLogLocalities);
|
||||
|
||||
for( Tag tag : oldLogSystem->getEpochEndTags() ) {
|
||||
for( Tag tag : allTags ) {
|
||||
locations.clear();
|
||||
logSystem->tLogs[1]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
|
||||
for(int loc : locations)
|
||||
|
@ -1746,7 +1726,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
if(configuration.remoteTLogReplicationFactor > 0) {
|
||||
logSystem->hasRemoteServers = true;
|
||||
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, remoteLocality);
|
||||
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, remoteLocality, allTags);
|
||||
} else {
|
||||
logSystem->hasRemoteServers = false;
|
||||
logSystem->remoteRecovery = logSystem->recoveryComplete;
|
||||
|
|
|
@ -185,6 +185,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
IKeyValueStore* txnStateStore;
|
||||
int64_t memoryLimit;
|
||||
std::map<Optional<Value>,int8_t> dcId_locality;
|
||||
std::vector<Tag> allTags;
|
||||
|
||||
int8_t getNextLocality() {
|
||||
int8_t maxLocality = -1;
|
||||
|
@ -310,10 +311,10 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
|
|||
|
||||
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() ) ) );
|
||||
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId] ) );
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->dcId_locality[recr.dcId], self->dcId_locality[remoteDcId], self->allTags ) );
|
||||
self->logSystem = newLogSystem;
|
||||
} else {
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, tagLocalitySpecial, tagLocalitySpecial ) );
|
||||
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, tagLocalitySpecial, tagLocalitySpecial, self->allTags ) );
|
||||
self->logSystem = newLogSystem;
|
||||
}
|
||||
return Void();
|
||||
|
@ -627,6 +628,20 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
|
|||
self->dcId_locality[decodeTagLocalityListKey(kv.key)] = decodeTagLocalityListValue(kv.value);
|
||||
}
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> rawTags = wait( self->txnStateStore->readRange( serverTagKeys ) );
|
||||
self->allTags.clear();
|
||||
self->allTags.push_back(txsTag);
|
||||
for(auto& kv : rawTags) {
|
||||
self->allTags.push_back(decodeServerTagValue( kv.value ));
|
||||
}
|
||||
|
||||
Standalone<VectorRef<KeyValueRef>> rawHistoryTags = wait( self->txnStateStore->readRange( serverTagHistoryKeys ) );
|
||||
for(auto& kv : rawHistoryTags) {
|
||||
self->allTags.push_back(decodeServerTagValue( kv.value ));
|
||||
}
|
||||
|
||||
uniquify(self->allTags);
|
||||
|
||||
//auto kvs = self->txnStateStore->readRange( systemKeys );
|
||||
//for( auto & kv : kvs.get() )
|
||||
// TraceEvent("MasterRecoveredTXS", self->dbgid).detail("K", printable(kv.key)).detail("V", printable(kv.value));
|
||||
|
|
Loading…
Reference in New Issue