added a peek method which will attempt to read the txsTag from the local region as much as possible

This commit is contained in:
Evan Tschannen 2018-09-28 12:21:08 -07:00
parent a24eadd73a
commit 05e7f08b26
7 changed files with 167 additions and 23 deletions

View File

@ -624,6 +624,9 @@ struct ILogSystem {
// Same contract as peek(), but can only peek from the logs elected in the same generation.
// If the preferred log server is down, a different log from the same generation will merge results locally before sending them to the log router.
virtual Reference<IPeekCursor> peekSpecial( UID dbgid, Version begin, Tag tag, int8_t peekLocality ) = 0;
// Same contract as peek(), but it allows specifying a preferred peek locality for tags that do not have locality
virtual void pop( Version upTo, Tag tag, Version knownCommittedVersion = 0, int8_t popLocality = tagLocalityInvalid ) = 0;
// Permits, but does not require, the log subsystem to strip `tag` from any or all messages with message versions < (upTo,0)
// The popping of any given message may be arbitrarily delayed.

View File

@ -190,6 +190,62 @@ struct LogSystemConfig {
return results;
}
int8_t getLocalityForDcId(Optional<Key> dcId) const {
std::map<int8_t, int> matchingLocalities;
std::map<int8_t, int> allLocalities;
for( auto& tLogSet : tLogs ) {
for( auto& tLog : tLogSet.tLogs ) {
if( tLog.present() && tLogSet.locality >= 0 ) {
if( tLog.interf().locality.dcId() == dcId ) {
matchingLocalities[tLogSet.locality]++;
} else {
allLocalities[tLogSet.locality]++;
}
}
}
}
for(auto& oldLog : oldTLogs) {
for( auto& tLogSet : oldLog.tLogs ) {
for( auto& tLog : tLogSet.tLogs ) {
if( tLog.present() && tLogSet.locality >= 0 ) {
if( tLog.interf().locality.dcId() == dcId ) {
matchingLocalities[tLogSet.locality]++;
} else {
allLocalities[tLogSet.locality]++;
}
}
}
}
}
if(!matchingLocalities.empty()) {
int8_t bestLocality = -1;
int bestLocalityCount = -1;
for(auto& it : matchingLocalities) {
if(it.second > bestLocalityCount) {
bestLocality = it.first;
bestLocalityCount = it.second;
}
}
return bestLocality;
}
if(!allLocalities.empty()) {
int8_t bestLocality = -1;
int bestLocalityCount = -1;
for(auto& it : allLocalities) {
if(it.second > bestLocalityCount) {
bestLocality = it.first;
bestLocalityCount = it.second;
}
}
return bestLocality;
}
return tagLocalityInvalid;
}
std::vector<std::pair<UID, NetworkAddress>> allSharedLogs() const {
typedef std::pair<UID, NetworkAddress> IdAddrPair;
std::vector<IdAddrPair> results;

View File

@ -35,7 +35,17 @@ public:
}
if(!self->cursor->hasMessage()) {
Void _ = wait( self->cursor->getMore() );
loop {
choose {
when(Void _ = wait( self->cursor->getMore() )) {
break;
}
when(Void _ = wait( self->localityChanged )) {
self->cursor = self->logSystem->peekSpecial( UID(), self->recoveryLoc, self->tag, self->peekLocality ? self->peekLocality->get() : tagLocalityInvalid );
self->localityChanged = self->peekLocality->onChange();
}
}
}
TraceEvent("PeekNextGetMore").detail("Queue", self->recoveryQueue.size()).detail("Bytes", bytes).detail("Loc", self->recoveryLoc).detail("End", self->logSystem->getEnd());
if(self->recoveryQueueDataSize == 0) {
self->recoveryQueueLoc = self->recoveryLoc;
@ -144,6 +154,6 @@ Future<LogSystemDiskQueueAdapter::CommitMessage> LogSystemDiskQueueAdapter::getC
return pcm.getFuture();
}
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag ) {
return new LogSystemDiskQueueAdapter( logSystem, tag );
}
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<int8_t>> peekLocality ) {
return new LogSystemDiskQueueAdapter( logSystem, tag, peekLocality );
}

View File

@ -40,9 +40,11 @@ public:
// It does, however, peek the specified tag directly at recovery time.
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, bool recover=true ) : logSystem(logSystem), tag(tag), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0) {
if (enableRecovery)
cursor = logSystem->peek( UID(), 0, tag, true );
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<int8_t>> peekLocality, bool recover=true ) : logSystem(logSystem), tag(tag), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0) {
if (enableRecovery) {
localityChanged = peekLocality ? peekLocality->onChange() : Never();
cursor = logSystem->peekSpecial( UID(), 1, tag, peekLocality ? peekLocality->get() : tagLocalityInvalid );
}
}
struct CommitMessage {
@ -74,6 +76,8 @@ public:
virtual int getCommitOverhead() { return 0; } //SOMEDAY: could this be more accurate?
private:
Reference<AsyncVar<int8_t>> peekLocality;
Future<Void> localityChanged;
Reference<ILogSystem::IPeekCursor> cursor;
Tag tag;
@ -93,6 +97,6 @@ private:
friend class LogSystemDiskQueueAdapterImpl;
};
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag );
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<int8_t>> peekLocality );
#endif
#endif

View File

@ -1297,7 +1297,7 @@ ACTOR Future<Void> masterProxyServerCore(
r->value().push_back(std::make_pair<Version,int>(0,0));
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, false);
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference<AsyncVar<int8_t>>(), false);
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
// ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is only a approximate formula for limiting the memory used.

View File

@ -593,13 +593,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end.present() ? end.get() + 1 : getPeekEnd(), tLogs[0]->locality == tagLocalityUpgraded) );
}
Reference<IPeekCursor> peekLocal( UID dbgid, Tag tag, Version begin, Version end ) {
ASSERT(tag.locality >= 0 || tag.locality == tagLocalityUpgraded);
Reference<IPeekCursor> peekLocal( UID dbgid, Tag tag, Version begin, Version end, bool useMergePeekCursors, int8_t peekLocality = tagLocalityInvalid ) {
if(tag.locality >= 0) {
peekLocality = tag.locality;
}
ASSERT(peekLocality >= 0 || peekLocality == tagLocalityUpgraded);
int bestSet = -1;
bool foundSpecial = false;
for(int t = 0; t < tLogs.size(); t++) {
if(tLogs[t]->logServers.size() && (tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || tLogs[t]->locality == tag.locality || tag.locality == tagLocalityUpgraded)) {
if(tLogs[t]->logServers.size() && (tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || tLogs[t]->locality == peekLocality || peekLocality == tagLocalityUpgraded)) {
if( tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
}
@ -609,31 +612,48 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(bestSet == -1) {
TraceEvent("TLogPeekLocalNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end);
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
if(useMergePeekCursors) {
throw worker_removed();
} else {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
}
if(begin >= tLogs[bestSet]->startVersion) {
TraceEvent("TLogPeekLocalBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestSet", bestSet).detail("BestSetStart", tLogs[bestSet]->startVersion).detail("LogId", tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )]->get().id());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )], tag, begin, end, false, false ) );
if(useMergePeekCursors) {
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logServers, tLogs[bestSet]->bestLocationFor( tag ), tLogs[bestSet]->logServers.size() + 1 - tLogs[bestSet]->tLogReplicationFactor, tag,
begin, end, true, tLogs[bestSet]->tLogLocalities, tLogs[bestSet]->tLogPolicy, tLogs[bestSet]->tLogReplicationFactor) );
} else {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )], tag, begin, end, false, false ) );
}
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
if(tLogs[bestSet]->startVersion < end) {
TraceEvent("TLogPeekLocalAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestSet", bestSet).detail("BestSetStart", tLogs[bestSet]->startVersion).detail("LogId", tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )]->get().id());
cursors.push_back( Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )], tag, tLogs[bestSet]->startVersion, end, false, false ) ) );
if(useMergePeekCursors) {
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logServers, tLogs[bestSet]->bestLocationFor( tag ), tLogs[bestSet]->logServers.size() + 1 - tLogs[bestSet]->tLogReplicationFactor, tag,
tLogs[bestSet]->startVersion, end, true, tLogs[bestSet]->tLogLocalities, tLogs[bestSet]->tLogPolicy, tLogs[bestSet]->tLogReplicationFactor) ) );
} else {
cursors.push_back( Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )], tag, tLogs[bestSet]->startVersion, end, false, false ) ) );
}
}
Version lastBegin = tLogs[bestSet]->startVersion;
int i = 0;
while(begin < lastBegin) {
if(i == oldLogData.size()) {
if(tag == txsTag && cursors.size()) {
break;
}
TraceEvent("TLogPeekLocalDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
throw worker_removed();
}
int bestOldSet = -1;
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalityUpgraded)) {
if(oldLogData[i].tLogs[t]->logServers.size() && (oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || oldLogData[i].tLogs[t]->locality == peekLocality || peekLocality == tagLocalityUpgraded)) {
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded ) {
foundSpecial = true;
}
@ -644,7 +664,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(foundSpecial) {
TraceEvent("TLogPeekLocalFoundSpecial", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end);
cursors.push_back(peekAll(dbgid, begin, std::min(lastBegin, end), tag, false, true));
cursors.push_back(peekAll(dbgid, begin, std::min(lastBegin, end), tag, useMergePeekCursors, !useMergePeekCursors));
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
break;
}
@ -664,7 +684,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TraceEvent("TLogPeekLocalAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end)
.detail("LogServers", oldLogData[i].tLogs[bestOldSet]->logServerString()).detail("ThisBegin", thisBegin).detail("LastBegin", lastBegin);
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logServers, oldLogData[i].tLogs[bestOldSet]->bestLocationFor( tag ), oldLogData[i].tLogs[bestOldSet]->logServers.size() + 1 - oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor, tag,
thisBegin, std::min(lastBegin, end), false, oldLogData[i].tLogs[bestOldSet]->tLogLocalities, oldLogData[i].tLogs[bestOldSet]->tLogPolicy, oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor)));
thisBegin, std::min(lastBegin, end), useMergePeekCursors, oldLogData[i].tLogs[bestOldSet]->tLogLocalities, oldLogData[i].tLogs[bestOldSet]->tLogPolicy, oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor)));
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
}
lastBegin = thisBegin;
@ -676,21 +696,63 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
virtual Reference<IPeekCursor> peekSpecial( UID dbgid, Version begin, Tag tag, int8_t peekLocality ) {
Version localEnd = invalidVersion;
Version end = getEnd();
if(peekLocality >= 0) {
for(auto& it : lockResults) {
if(it.logSet->locality == peekLocality) {
auto versions = TagPartitionedLogSystem::getDurableVersion(dbgid, it);
if(versions.present()) {
localEnd = versions.get().first;
}
break;
}
}
}
TraceEvent("TLogPeekSpecial", dbgid).detail("Begin", begin).detail("End", end).detail("LocalEnd", localEnd).detail("PeekLocality", peekLocality);
if(localEnd == invalidVersion) {
return peekAll(dbgid, begin, end, tag, true, false);
}
try {
if(localEnd >= end) {
return peekLocal(dbgid, tag, begin, end, true, peekLocality);
}
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
cursors.resize(2);
cursors[1] = peekLocal(dbgid, tag, begin, localEnd, true, peekLocality);
cursors[0] = peekAll(dbgid, localEnd, end, tag, true, false);
epochEnds.push_back(LogMessageVersion(localEnd));
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
} catch( Error& e ) {
if(e.code() == error_code_worker_removed) {
return peekAll(dbgid, begin, end, tag, true, false);
}
throw;
}
}
virtual Reference<IPeekCursor> peekSingle( UID dbgid, Version begin, Tag tag, vector<pair<Version,Tag>> history ) {
while(history.size() && begin >= history.back().first) {
history.pop_back();
}
if(history.size() == 0) {
return peekLocal(dbgid, tag, begin, getPeekEnd());
return peekLocal(dbgid, tag, begin, getPeekEnd(), false);
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
cursors.push_back( peekLocal(dbgid, tag, history[0].first, getPeekEnd()) );
cursors.push_back( peekLocal(dbgid, tag, history[0].first, getPeekEnd(), false) );
for(int i = 0; i < history.size(); i++) {
cursors.push_back( peekLocal(dbgid, history[i].second, i+1 == history.size() ? begin : std::max(history[i+1].first, begin), history[i].first) );
cursors.push_back( peekLocal(dbgid, history[i].second, i+1 == history.size() ? begin : std::max(history[i+1].first, begin), history[i].first, false) );
epochEnds.push_back(LogMessageVersion(history[i].first));
}

View File

@ -585,7 +585,16 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
return Void();
}
ACTOR Future<Void> updateLocalityForDcId(Optional<Key> dcId, Reference<ILogSystem> oldLogSystem, Reference<AsyncVar<int8_t>> locality) {
loop {
locality->set( oldLogSystem->getLogSystemConfig().getLocalityForDcId(dcId) );
Void _ = wait( oldLogSystem->onLogSystemConfigChange() );
}
}
ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem ) {
state Reference<AsyncVar<int8_t>> myLocality = Reference<AsyncVar<int8_t>>( new AsyncVar<int8_t>() );
state Future<Void> localityUpdater = updateLocalityForDcId(self->myInterface.locality.dcId(), oldLogSystem, myLocality);
// Peek the txnStateTag in oldLogSystem and recover self->txnStateStore
// For now, we also obtain the recovery metadata that the log system obtained during the end_epoch process for comparison
@ -595,7 +604,7 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
// Recover transaction state store
if(self->txnStateStore) self->txnStateStore->close();
self->txnStateLogAdapter = openDiskQueueAdapter( oldLogSystem, txsTag );
self->txnStateLogAdapter = openDiskQueueAdapter( oldLogSystem, txsTag, myLocality );
self->txnStateStore = keyValueStoreLogSystem( self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false, false, true );
// Versionstamped operations (particularly those applied from DR) define a minimum commit version