fix: when resetting the peekCursor, we cannot discard the popped data if the adapter has already processed data

This commit is contained in:
Evan Tschannen 2019-07-30 13:25:25 -07:00
parent 2301728903
commit 9e3ec2cb33
4 changed files with 18 additions and 15 deletions

View File

@ -658,7 +658,7 @@ struct ILogSystem {
// Same contract as peek(), but can only peek from the logs elected in the same generation.
// If the preferred log server is down, a different log from the same generation will merge results locally before sending them to the log router.
virtual Reference<IPeekCursor> peekTxs( UID dbgid, Version begin, int8_t peekLocality, Version localEnd ) = 0;
virtual Reference<IPeekCursor> peekTxs( UID dbgid, Version begin, int8_t peekLocality, Version localEnd, bool canDiscardPopped ) = 0;
// Same contract as peek(), but only for peeking the txsLocality. It allows specifying a preferred peek locality.
virtual Version getKnownCommittedVersion() = 0;

View File

@ -42,19 +42,19 @@ public:
break;
}
when( wait( self->localityChanged ) ) {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion );
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, self->totalRecoveredBytes == 0 );
self->localityChanged = self->peekLocality->onChange();
}
when( wait( delay(self->peekTypeSwitches==0 ? SERVER_KNOBS->DISK_QUEUE_ADAPTER_MIN_SWITCH_TIME : SERVER_KNOBS->DISK_QUEUE_ADAPTER_MAX_SWITCH_TIME)) ) {
self->peekTypeSwitches++;
if(self->peekTypeSwitches%3==1) {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion );
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion, self->totalRecoveredBytes == 0 );
self->localityChanged = Never();
} else if(self->peekTypeSwitches%3==2) {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion );
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, self->totalRecoveredBytes == 0 );
self->localityChanged = self->peekLocality->onChange();
} else {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion );
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, self->totalRecoveredBytes == 0 );
self->localityChanged = self->peekLocality->onChange();
}
}
@ -70,14 +70,15 @@ public:
self->recoveryQueueDataSize = 0;
self->recoveryLoc = self->cursor->popped();
self->recoveryQueueLoc = self->recoveryLoc;
self->totalRecoveredBytes = 0;
if(self->peekTypeSwitches%3==1) {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion );
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion, true );
self->localityChanged = Never();
} else if(self->peekTypeSwitches%3==2) {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion );
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, true );
self->localityChanged = self->peekLocality->onChange();
} else {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion );
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion, true );
self->localityChanged = self->peekLocality->onChange();
}
self->hasDiscardedData = true;
@ -96,6 +97,7 @@ public:
self->recoveryQueue.push_back( Standalone<StringRef>(self->cursor->getMessage(), self->cursor->arena()) );
self->recoveryQueueDataSize += self->recoveryQueue.back().size();
self->totalRecoveredBytes += self->recoveryQueue.back().size();
self->cursor->nextMessage();
if(!self->cursor->hasMessage()) self->recoveryLoc = self->cursor->version().version;

View File

@ -52,10 +52,10 @@ public:
// It does, however, peek the specified tag directly at recovery time.
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality, bool recover=true ) : logSystem(logSystem), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0), hasDiscardedData(false) {
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality, bool recover=true ) : logSystem(logSystem), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0), hasDiscardedData(false), totalRecoveredBytes(0) {
if (enableRecovery) {
localityChanged = peekLocality ? peekLocality->onChange() : Never();
cursor = logSystem->peekTxs( UID(), 1, peekLocality ? peekLocality->get().primaryLocality : tagLocalityInvalid, peekLocality ? peekLocality->get().knownCommittedVersion : invalidVersion );
cursor = logSystem->peekTxs( UID(), 1, peekLocality ? peekLocality->get().primaryLocality : tagLocalityInvalid, peekLocality ? peekLocality->get().knownCommittedVersion : invalidVersion, true );
}
}
@ -110,6 +110,7 @@ private:
std::deque< Promise<CommitMessage> > commitMessages;
Version nextCommit;
bool hasDiscardedData;
int totalRecoveredBytes;
friend class LogSystemDiskQueueAdapterImpl;
};

View File

@ -743,7 +743,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
virtual Reference<IPeekCursor> peekTxs( UID dbgid, Version begin, int8_t peekLocality, Version localEnd ) {
virtual Reference<IPeekCursor> peekTxs( UID dbgid, Version begin, int8_t peekLocality, Version localEnd, bool canDiscardPopped ) {
Version end = getEnd();
if(!tLogs.size()) {
TraceEvent("TLogPeekTxsNoLogs", dbgid);
@ -769,7 +769,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
cursors.push_back(peekAll(dbgid, begin, end, txsTag, true));
}
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end, false, false, true) );
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end, false, false, canDiscardPopped) );
}
try {
@ -783,7 +783,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
cursors.push_back(peekLocal(dbgid, txsTag, begin, end, true, peekLocality));
}
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end, false, false, true) );
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end, false, false, canDiscardPopped) );
}
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
@ -803,7 +803,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
allCursors.push_back(peekAll(dbgid, localEnd, end, txsTag, true));
}
cursors[1] = Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(localCursors, begin, localEnd, false, false, true) );
cursors[1] = Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(localCursors, begin, localEnd, false, false, canDiscardPopped) );
cursors[0] = Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(allCursors, localEnd, end, false, false, false) );
epochEnds.emplace_back(localEnd);
@ -819,7 +819,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
cursors.push_back(peekAll(dbgid, begin, end, txsTag, true));
}
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end, false, false, true) );
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end, false, false, canDiscardPopped) );
}
throw;
}