avoid peeking from logs that do not match the tag’s locality

This commit is contained in:
Evan Tschannen 2018-06-01 18:42:48 -07:00
parent 0e699a3c23
commit c519339adb
6 changed files with 72 additions and 61 deletions

View File

@ -364,7 +364,7 @@ ACTOR Future<Void> logRouterCore(
when( Void _ = wait( dbInfoChange ) ) {
dbInfoChange = db->onChange();
logRouterData.allowPops = db->get().recoveryState == 7;
logRouterData.logSystem->set(ILogSystem::fromServerDBInfo( logRouterData.dbgid, db->get() ));
logRouterData.logSystem->set(ILogSystem::fromServerDBInfo( logRouterData.dbgid, db->get(), true ));
}
when( TLogPeekRequest req = waitNext( interf.peekMessages.getFuture() ) ) {
addActor.send( logRouterPeekMessages( &logRouterData, req ) );

View File

@ -473,8 +473,8 @@ struct ILogSystem {
virtual Future<Void> endEpoch() = 0;
// Ends the current epoch without starting a new one
static Reference<ILogSystem> fromServerDBInfo( UID const& dbgid, struct ServerDBInfo const& db );
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, struct LocalityData const&, struct LogSystemConfig const&, bool excludeRemote = false );
static Reference<ILogSystem> fromServerDBInfo( UID const& dbgid, struct ServerDBInfo const& db, bool usePreviousEpochEnd = false );
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, struct LocalityData const&, struct LogSystemConfig const&, bool excludeRemote = false, bool usePreviousEpochEnd = false );
// Constructs a new ILogSystem implementation from the given ServerDBInfo/LogSystemConfig. Might return a null reference if there isn't a fully recovered log system available.
// The caller can peek() the returned log system and can push() if it has version numbers reserved for it and prevVersions

View File

@ -157,6 +157,7 @@ struct LogSystemConfig {
int32_t expectedLogSets;
UID recruitmentID;
bool stopped;
Optional<Version> previousEpochEndVersion;
LogSystemConfig() : logSystemType(0), logRouterTags(0), expectedLogSets(0), stopped(false) {}
@ -217,7 +218,7 @@ struct LogSystemConfig {
bool operator == ( const LogSystemConfig& rhs ) const { return isEqual(rhs); }
bool isEqual(LogSystemConfig const& r) const {
return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs && expectedLogSets == r.expectedLogSets && logRouterTags == r.logRouterTags && recruitmentID == r.recruitmentID && stopped == r.stopped;
return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs && expectedLogSets == r.expectedLogSets && logRouterTags == r.logRouterTags && recruitmentID == r.recruitmentID && stopped == r.stopped && previousEpochEndVersion == r.previousEpochEndVersion;
}
bool isEqualIds(LogSystemConfig const& r) const {
@ -248,7 +249,7 @@ struct LogSystemConfig {
template <class Ar>
void serialize( Ar& ar ) {
ar & logSystemType & tLogs & logRouterTags & oldTLogs & expectedLogSets & recruitmentID & stopped;
ar & logSystemType & tLogs & logRouterTags & oldTLogs & expectedLogSets & recruitmentID & stopped & previousEpochEndVersion;
}
};

View File

@ -1191,7 +1191,6 @@ ACTOR Future<Void> masterProxyServerCore(
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get());
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, false);
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true);
onError = onError || commitData.logSystem->onError();
addActor.send(transactionStarter(proxy, master, db, addActor, &commitData));
addActor.send(readRequestServer(proxy, &commitData));

View File

@ -1822,7 +1822,7 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
logSystem->set(ILogSystem::fromOldLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig ));
found = true;
} else if( self->dbInfo->get().logSystemConfig.isEqualIds(recoverFrom) ) {
logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig ));
logSystem->set(ILogSystem::fromLogSystemConfig( logData->logId, self->dbInfo->get().myLocality, self->dbInfo->get().logSystemConfig, false, true ));
found = true;
}
else if( self->dbInfo->get().recoveryState >= RecoveryState::FULLY_RECOVERED ) {
@ -1909,17 +1909,16 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
logData->initialized = true;
self->newLogData.trigger();
if(req.isPrimary && logData->unrecoveredBefore <= req.knownCommittedVersion && !logData->stopped) {
logData->logRouterPopToVersion = req.knownCommittedVersion;
std::vector<Tag> tags;
tags.push_back(logData->remoteTag);
Void _ = wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.knownCommittedVersion, true) || logData->removed);
}
TraceEvent("TLogPullComplete", self->dbgid).detail("logId", logData->logId);
if(req.isPrimary && !req.recoverTags.empty() && !logData->stopped && req.knownCommittedVersion < req.recoverAt) {
Void _ = wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed);
if(req.isPrimary && !logData->stopped && logData->unrecoveredBefore <= req.recoverAt) {
if(req.recoverFrom.logRouterTags > 0 && req.locality != tagLocalityInvalid) {
logData->logRouterPopToVersion = req.recoverAt;
std::vector<Tag> tags;
tags.push_back(logData->remoteTag);
Void _ = wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) || logData->removed);
} else if(!req.recoverTags.empty()) {
ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion);
Void _ = wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed);
}
}
if(req.isPrimary && logData->version.get() < req.recoverAt && !logData->stopped) {

View File

@ -75,11 +75,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Future<Void> remoteRecovery;
Future<Void> remoteRecoveryComplete;
std::vector<LogLockInfo> lockResults;
bool recoveryCompleteWrittenToCoreState;
AsyncVar<bool> recoveryCompleteWrittenToCoreState;
bool remoteLogsWrittenToCoreState;
bool hasRemoteServers;
Optional<Version> epochEndVersion;
Optional<Version> previousEpochEndVersion;
Version knownCommittedVersion;
LocalityData locality;
std::map< std::pair<UID, Tag>, std::pair<Version, Version> > outstandingPops; // For each currently running popFromLog actor, (log server #, tag)->popped version
@ -120,7 +121,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return epochEnd( outLogSystem, dbgid, oldState, rejoins, locality );
}
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf, bool excludeRemote ) {
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf, bool excludeRemote, bool usePreviousEpochEnd ) {
ASSERT( lsConf.logSystemType == 2 || (lsConf.logSystemType == 0 && !lsConf.tLogs.size()) );
//ASSERT(lsConf.epoch == epoch); //< FIXME
Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(dbgid, locality) );
@ -130,6 +131,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->logRouterTags = lsConf.logRouterTags;
logSystem->recruitmentID = lsConf.recruitmentID;
logSystem->stopped = lsConf.stopped;
if(usePreviousEpochEnd) {
logSystem->previousEpochEndVersion = lsConf.previousEpochEndVersion;
}
for( int i = 0; i < lsConf.tLogs.size(); i++ ) {
TLogSet const& tLogSet = lsConf.tLogs[i];
if(!excludeRemote || tLogSet.isLocal) {
@ -323,7 +327,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
virtual void coreStateWritten( DBCoreState const& newState ) {
if( !newState.oldTLogData.size() ) {
recoveryCompleteWrittenToCoreState = true;
recoveryCompleteWrittenToCoreState.set(true);
}
for(auto& t : newState.tLogs) {
if(!t.isLocal) {
@ -360,13 +364,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
}
for(auto& old : self->oldLogData) {
for(auto& it : old.tLogs) {
for(auto &t : it->logRouters) {
if( t->get().present() ) {
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
} else {
changes.push_back(t->onChange());
if(!self->recoveryCompleteWrittenToCoreState.get()) {
for(auto& old : self->oldLogData) {
for(auto& it : old.tLogs) {
for(auto &t : it->logRouters) {
if( t->get().present() ) {
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
} else {
changes.push_back(t->onChange());
}
}
}
}
@ -376,9 +383,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
changes.push_back(self->remoteRecovery);
}
if(!changes.size()) {
changes.push_back(Never()); //waiting on an empty vector will return immediately
}
changes.push_back(self->recoveryCompleteWrittenToCoreState.onChange());
ASSERT( failed.size() >= 1 );
Void _ = wait( quorum(changes, 1) || tagError<Void>( quorum( failed, 1 ), master_tlog_failed() ) || self->actors.getResult() );
@ -414,19 +419,20 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<Reference<LogSet>> localSets;
Version lastBegin = 0;
for(auto& log : tLogs) {
if(log->isLocal && log->logServers.size()) {
if(log->isLocal && log->logServers.size() && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || tag.locality == tagLocalityLogRouter)) {
lastBegin = std::max(lastBegin, log->startVersion);
localSets.push_back(log);
if(log->hasBestPolicy && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded)) {
if(log->hasBestPolicy) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
}
if(log->hasBestPolicy && bestSet == -1) {
nextBestSet = localSets.size()-1;
}
}
}
if(!localSets.size()) {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
if(begin >= lastBegin) {
TraceEvent("TLogPeekAllCurrentOnly", dbgid).detail("tag", tag.toString()).detail("begin", begin).detail("end", end).detail("bestLogs", bestSet >= 0 ? localSets[bestSet]->logServerString() : "no best set");
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
@ -459,18 +465,25 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<Reference<LogSet>> localOldSets;
Version thisBegin = begin;
for(auto& log : oldLogData[i].tLogs) {
if(log->isLocal && log->logServers.size()) {
if(log->isLocal && log->logServers.size() && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded || tag.locality == tagLocalityLogRouter)) {
thisBegin = std::max(thisBegin, log->startVersion);
localOldSets.push_back(log);
if(log->hasBestPolicy && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded)) {
if(log->hasBestPolicy) {
bestOldSet = localOldSets.size()-1;
nextBestOldSet = bestOldSet;
}
if(log->hasBestPolicy && bestOldSet == -1) {
nextBestOldSet = localOldSets.size()-1;
}
}
}
if(!localOldSets.size()) {
TraceEvent("TLogPeekNoLocalSets", dbgid).detail("tag", tag.toString()).detail("begin", begin).detail("end", end).detail("lastBegin", lastBegin);
if(throwIfDead) {
throw worker_removed();
} else {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
}
if(thisBegin < lastBegin) {
if(thisBegin < end) {
TraceEvent("TLogPeekAllAddingOld", dbgid).detail("tag", tag.toString()).detail("begin", begin).detail("end", end)
@ -490,7 +503,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Reference<IPeekCursor> peekRemote( UID dbgid, Version begin, Tag tag, bool parallelGetMore ) {
int bestSet = -1;
Version lastBegin = 0;
Version lastBegin = previousEpochEndVersion.present() ? previousEpochEndVersion.get() + 1 : 0;
for(int t = 0; t < tLogs.size(); t++) {
if(tLogs[t]->isLocal) {
lastBegin = std::max(lastBegin, tLogs[t]->startVersion);
@ -703,13 +716,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(log->isLocal && log->logServers.size()) {
TraceEvent("TLogPeekLogRouterLocalSet", dbgid).detail("tag", tag.toString()).detail("begin", begin).detail("logServers", log->logServerString());
localSets.push_back(log);
if(log->hasBestPolicy && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded)) {
if(log->hasBestPolicy) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
}
if(log->hasBestPolicy && bestSet == -1) {
nextBestSet = localSets.size()-1;
}
}
}
@ -726,6 +736,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
}
bool firstOld = true;
for(auto& old : oldLogData) {
found = false;
for( auto& log : old.tLogs ) {
@ -747,21 +758,19 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(log->isLocal && log->logServers.size()) {
TraceEvent("TLogPeekLogRouterOldLocalSet", dbgid).detail("tag", tag.toString()).detail("begin", begin).detail("logServers", log->logServerString());
localSets.push_back(log);
if(log->hasBestPolicy && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded)) {
if(log->hasBestPolicy) {
bestSet = localSets.size()-1;
nextBestSet = bestSet;
}
if(log->hasBestPolicy && bestSet == -1) {
nextBestSet = localSets.size()-1;
}
}
}
TraceEvent("TLogPeekLogRouterOldSets", dbgid).detail("tag", tag.toString()).detail("begin", begin).detail("oldEpoch", old.epochEnd);
TraceEvent("TLogPeekLogRouterOldSets", dbgid).detail("tag", tag.toString()).detail("begin", begin).detail("oldEpoch", old.epochEnd).detail("previousEpochEndVersion", previousEpochEndVersion.present() ? previousEpochEndVersion.get() : -1).detail("firstOld", firstOld);
//FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, old.epochEnd, false ) );
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, firstOld && previousEpochEndVersion.present() ? previousEpochEndVersion.get() + 1 : old.epochEnd, false ) );
}
firstOld = false;
}
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
@ -928,6 +937,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystemConfig.logRouterTags = logRouterTags;
logSystemConfig.recruitmentID = recruitmentID;
logSystemConfig.stopped = stopped;
logSystemConfig.previousEpochEndVersion = previousEpochEndVersion;
for( int i = 0; i < tLogs.size(); i++ ) {
Reference<LogSet> logSet = tLogs[i];
if(logSet->isLocal || remoteLogsWrittenToCoreState) {
@ -952,7 +962,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if(!recoveryCompleteWrittenToCoreState) {
if(!recoveryCompleteWrittenToCoreState.get()) {
for( int i = 0; i < oldLogData.size(); i++ ) {
logSystemConfig.oldTLogs.push_back(OldTLogConf());
@ -994,7 +1004,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
}
if(!recoveryCompleteWrittenToCoreState) {
if(!recoveryCompleteWrittenToCoreState.get()) {
for( int i = 0; i < oldLogData.size(); i++ ) {
for(auto& t : oldLogData[i].tLogs) {
for( int j = 0; j < t->logServers.size(); j++ ) {
@ -1590,6 +1600,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) );
logSystem->logSystemType = 2;
logSystem->expectedLogSets = 1;
logSystem->previousEpochEndVersion = oldLogSystem->epochEndVersion;
logSystem->recruitmentID = g_random->randomUniqueID();
oldLogSystem->recruitmentID = logSystem->recruitmentID;
@ -1660,8 +1671,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
state Future<Void> oldRouterRecruitment = Never();
TraceEvent("NewEpochStartVersion").detail("startVersion", logSystem->tLogs[0]->startVersion).detail("epochEnd", oldLogSystem->knownCommittedVersion + 1).detail("locality", primaryLocality);
if(logSystem->tLogs[0]->startVersion < oldLogSystem->knownCommittedVersion + 1) {
TraceEvent("NewEpochStartVersion", oldLogSystem->getDebugID()).detail("startVersion", logSystem->tLogs[0]->startVersion).detail("epochEnd", oldLogSystem->knownCommittedVersion + 1).detail("locality", primaryLocality).detail("oldLogRouterTags", oldLogSystem->logRouterTags);
if(oldLogSystem->logRouterTags > 0 || logSystem->tLogs[0]->startVersion < oldLogSystem->knownCommittedVersion + 1) {
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters(oldLogSystem.getPtr(), recr.oldLogRouters, recoveryCount, primaryLocality, logSystem->tLogs[0]->startVersion, localities, logSystem->tLogs[0]->tLogPolicy, logSystem->tLogs[0]->hasBestPolicy, false);
} else {
oldLogSystem->logSystemConfigChanged.trigger();
@ -1737,7 +1748,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->tLogs[1]->logServers.resize( recr.satelliteTLogs.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[1]->updateLocalitySet(satelliteLocalities);
for( Tag tag : allTags ) {
for(int i = 0; i < oldLogSystem->logRouterTags; i++) {
Tag tag(tagLocalityLogRouter, i);
locations.clear();
logSystem->tLogs[1]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
for(int loc : locations)
@ -2019,11 +2031,11 @@ Future<Void> ILogSystem::recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSys
return TagPartitionedLogSystem::recoverAndEndEpoch( outLogSystem, dbgid, oldState, rejoins, locality );
}
Reference<ILogSystem> ILogSystem::fromLogSystemConfig( UID const& dbgid, struct LocalityData const& locality, struct LogSystemConfig const& conf, bool excludeRemote ) {
Reference<ILogSystem> ILogSystem::fromLogSystemConfig( UID const& dbgid, struct LocalityData const& locality, struct LogSystemConfig const& conf, bool excludeRemote, bool usePreviousEpochEnd ) {
if (conf.logSystemType == 0)
return Reference<ILogSystem>();
else if (conf.logSystemType == 2)
return TagPartitionedLogSystem::fromLogSystemConfig( dbgid, locality, conf, excludeRemote );
return TagPartitionedLogSystem::fromLogSystemConfig( dbgid, locality, conf, excludeRemote, usePreviousEpochEnd );
else
throw internal_error();
}
@ -2037,6 +2049,6 @@ Reference<ILogSystem> ILogSystem::fromOldLogSystemConfig( UID const& dbgid, stru
throw internal_error();
}
Reference<ILogSystem> ILogSystem::fromServerDBInfo( UID const& dbgid, ServerDBInfo const& dbInfo ) {
return fromLogSystemConfig( dbgid, dbInfo.myLocality, dbInfo.logSystemConfig );
Reference<ILogSystem> ILogSystem::fromServerDBInfo( UID const& dbgid, ServerDBInfo const& dbInfo, bool usePreviousEpochEnd ) {
return fromLogSystemConfig( dbgid, dbInfo.myLocality, dbInfo.logSystemConfig, false, usePreviousEpochEnd );
}