fix: we need to restart the peek cursor when the known committed version becomes available
This commit is contained in:
parent
a92fc911ac
commit
e7e1c634e0
|
@ -624,9 +624,13 @@ struct ILogSystem {
|
|||
// Same contract as peek(), but can only peek from the logs elected in the same generation.
|
||||
// If the preferred log server is down, a different log from the same generation will merge results locally before sending them to the log router.
|
||||
|
||||
virtual Reference<IPeekCursor> peekSpecial( UID dbgid, Version begin, Tag tag, int8_t peekLocality ) = 0;
|
||||
virtual Reference<IPeekCursor> peekSpecial( UID dbgid, Version begin, Tag tag, int8_t peekLocality, Version localEnd ) = 0;
|
||||
// Same contract as peek(), but it allows specifying a preferred peek locality for tags that do not have locality
|
||||
|
||||
virtual Version getKnownCommittedVersion(int8_t loc) = 0;
|
||||
|
||||
virtual Future<Void> onKnownCommittedVersionChange(int8_t loc) = 0;
|
||||
|
||||
virtual void pop( Version upTo, Tag tag, Version knownCommittedVersion = 0, int8_t popLocality = tagLocalityInvalid ) = 0;
|
||||
// Permits, but does not require, the log subsystem to strip `tag` from any or all messages with message versions < (upTo,0)
|
||||
// The popping of any given message may be arbitrarily delayed.
|
||||
|
|
|
@ -41,7 +41,7 @@ public:
|
|||
break;
|
||||
}
|
||||
when(Void _ = wait( self->localityChanged )) {
|
||||
self->cursor = self->logSystem->peekSpecial( UID(), self->recoveryLoc, self->tag, self->peekLocality ? self->peekLocality->get() : tagLocalityInvalid );
|
||||
self->cursor = self->logSystem->peekSpecial( UID(), self->recoveryLoc, self->tag, self->peekLocality ? self->peekLocality->get().first : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().second : invalidVersion );
|
||||
self->localityChanged = self->peekLocality->onChange();
|
||||
}
|
||||
}
|
||||
|
@ -154,6 +154,6 @@ Future<LogSystemDiskQueueAdapter::CommitMessage> LogSystemDiskQueueAdapter::getC
|
|||
return pcm.getFuture();
|
||||
}
|
||||
|
||||
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<int8_t>> peekLocality ) {
|
||||
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<std::pair<int8_t,Version>>> peekLocality ) {
|
||||
return new LogSystemDiskQueueAdapter( logSystem, tag, peekLocality );
|
||||
}
|
||||
|
|
|
@ -40,10 +40,10 @@ public:
|
|||
|
||||
// It does, however, peek the specified tag directly at recovery time.
|
||||
|
||||
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<int8_t>> peekLocality, bool recover=true ) : logSystem(logSystem), tag(tag), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0) {
|
||||
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<std::pair<int8_t,Version>>> peekLocality, bool recover=true ) : logSystem(logSystem), tag(tag), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0) {
|
||||
if (enableRecovery) {
|
||||
localityChanged = peekLocality ? peekLocality->onChange() : Never();
|
||||
cursor = logSystem->peekSpecial( UID(), 1, tag, peekLocality ? peekLocality->get() : tagLocalityInvalid );
|
||||
cursor = logSystem->peekSpecial( UID(), 1, tag, peekLocality ? peekLocality->get().first : tagLocalityInvalid, peekLocality ? peekLocality->get().second : invalidVersion );
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,7 +76,7 @@ public:
|
|||
virtual int getCommitOverhead() { return 0; } //SOMEDAY: could this be more accurate?
|
||||
|
||||
private:
|
||||
Reference<AsyncVar<int8_t>> peekLocality;
|
||||
Reference<AsyncVar<std::pair<int8_t,Version>>> peekLocality;
|
||||
Future<Void> localityChanged;
|
||||
Reference<ILogSystem::IPeekCursor> cursor;
|
||||
Tag tag;
|
||||
|
@ -97,6 +97,6 @@ private:
|
|||
friend class LogSystemDiskQueueAdapterImpl;
|
||||
};
|
||||
|
||||
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<int8_t>> peekLocality );
|
||||
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Tag tag, Reference<AsyncVar<std::pair<int8_t,Version>>> peekLocality );
|
||||
|
||||
#endif
|
||||
|
|
|
@ -1297,7 +1297,7 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
r->value().push_back(std::make_pair<Version,int>(0,0));
|
||||
|
||||
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
|
||||
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference<AsyncVar<int8_t>>(), false);
|
||||
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference<AsyncVar<std::pair<int8_t,Version>>>(), false);
|
||||
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
|
||||
|
||||
// ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is only a approximate formula for limiting the memory used.
|
||||
|
|
|
@ -696,21 +696,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
}
|
||||
|
||||
virtual Reference<IPeekCursor> peekSpecial( UID dbgid, Version begin, Tag tag, int8_t peekLocality ) {
|
||||
Version localEnd = invalidVersion;
|
||||
virtual Reference<IPeekCursor> peekSpecial( UID dbgid, Version begin, Tag tag, int8_t peekLocality, Version localEnd ) {
|
||||
Version end = getEnd();
|
||||
if(peekLocality >= 0) {
|
||||
for(auto& it : lockResults) {
|
||||
if(it.logSet->locality == peekLocality) {
|
||||
auto versions = TagPartitionedLogSystem::getDurableVersion(dbgid, it);
|
||||
if(versions.present()) {
|
||||
localEnd = versions.get().first;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TLogPeekSpecial", dbgid).detail("Begin", begin).detail("End", end).detail("LocalEnd", localEnd).detail("PeekLocality", peekLocality);
|
||||
if(localEnd == invalidVersion) {
|
||||
return peekAll(dbgid, begin, end, tag, true, false);
|
||||
|
@ -835,6 +822,28 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
virtual Version getKnownCommittedVersion(int8_t loc) {
|
||||
for(auto& it : lockResults) {
|
||||
if(it.logSet->locality == loc) {
|
||||
auto versions = TagPartitionedLogSystem::getDurableVersion(dbgid, it);
|
||||
if(versions.present()) {
|
||||
return versions.get().first;
|
||||
}
|
||||
return invalidVersion;
|
||||
}
|
||||
}
|
||||
return invalidVersion;
|
||||
}
|
||||
|
||||
virtual Future<Void> onKnownCommittedVersionChange(int8_t loc) {
|
||||
for(auto& it : lockResults) {
|
||||
if(it.logSet->locality == loc) {
|
||||
return TagPartitionedLogSystem::getDurableVersionChanged(it);
|
||||
}
|
||||
}
|
||||
return Never();
|
||||
}
|
||||
|
||||
void popLogRouter( Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality ) { //FIXME: do not need to pop all generations of old logs
|
||||
if (!upTo) return;
|
||||
for(auto& t : tLogs) {
|
||||
|
|
|
@ -585,15 +585,21 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateLocalityForDcId(Optional<Key> dcId, Reference<ILogSystem> oldLogSystem, Reference<AsyncVar<int8_t>> locality) {
|
||||
ACTOR Future<Void> updateLocalityForDcId(Optional<Key> dcId, Reference<ILogSystem> oldLogSystem, Reference<AsyncVar<std::pair<int8_t,Version>>> locality) {
|
||||
loop {
|
||||
locality->set( oldLogSystem->getLogSystemConfig().getLocalityForDcId(dcId) );
|
||||
Void _ = wait( oldLogSystem->onLogSystemConfigChange() );
|
||||
int8_t loc = oldLogSystem->getLogSystemConfig().getLocalityForDcId(dcId);
|
||||
Version ver = locality->get().second;
|
||||
if(ver == invalidVersion || loc != locality->get().first) {
|
||||
ver = oldLogSystem->getKnownCommittedVersion(loc);
|
||||
}
|
||||
locality->set( std::make_pair(loc,ver) );
|
||||
TraceEvent("UpdatedLocalityForDcId").detail("DcId", printable(dcId)).detail("Locality", loc).detail("Version", ver);
|
||||
Void _ = wait( oldLogSystem->onLogSystemConfigChange() || oldLogSystem->onKnownCommittedVersionChange(loc) );
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem ) {
|
||||
state Reference<AsyncVar<int8_t>> myLocality = Reference<AsyncVar<int8_t>>( new AsyncVar<int8_t>() );
|
||||
state Reference<AsyncVar<std::pair<int8_t,Version>>> myLocality = Reference<AsyncVar<std::pair<int8_t,Version>>>( new AsyncVar<std::pair<int8_t,Version>>(std::make_pair(tagLocalityInvalid, invalidVersion) ) );
|
||||
state Future<Void> localityUpdater = updateLocalityForDcId(self->myInterface.locality.dcId(), oldLogSystem, myLocality);
|
||||
// Peek the txnStateTag in oldLogSystem and recover self->txnStateStore
|
||||
|
||||
|
|
Loading…
Reference in New Issue