fix: the cursor was not reset when the disk adapter was reset

added a buggy to cause reset to happen more often in simulation
This commit is contained in:
Evan Tschannen 2019-07-30 12:58:18 -07:00
parent 1d326e3dc8
commit 8f887ccaa5
2 changed files with 13 additions and 2 deletions

View File

@ -63,13 +63,24 @@ public:
TraceEvent("PeekNextGetMore").detail("Queue", self->recoveryQueue.size()).detail("Bytes", bytes).detail("Loc", self->recoveryLoc)
.detail("End", self->logSystem->getEnd()).detail("HasMessage", self->cursor->hasMessage()).detail("Version", self->cursor->version().version);
if(self->cursor->popped() != 0) {
if(self->cursor->popped() != 0 || (!self->hasDiscardedData && BUGGIFY_WITH_PROB(0.01))) {
TEST(true); //disk adapter reset
TraceEvent(SevWarnAlways, "DiskQueueAdapterReset").detail("Version", self->cursor->popped());
self->recoveryQueue.clear();
self->recoveryQueueDataSize = 0;
self->recoveryLoc = self->cursor->popped();
self->recoveryQueueLoc = self->recoveryLoc;
if(self->peekTypeSwitches%3==1) {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, tagLocalityInvalid, invalidVersion );
self->localityChanged = Never();
} else if(self->peekTypeSwitches%3==2) {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().secondaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion );
self->localityChanged = self->peekLocality->onChange();
} else {
self->cursor = self->logSystem->peekTxs( UID(), self->recoveryLoc, self->peekLocality ? self->peekLocality->get().primaryLocality : tagLocalityInvalid, self->peekLocality ? self->peekLocality->get().knownCommittedVersion : invalidVersion );
self->localityChanged = self->peekLocality->onChange();
}
self->hasDiscardedData = true;
throw disk_adapter_reset();
}

View File

@ -52,7 +52,7 @@ public:
// It does, however, peek the specified tag directly at recovery time.
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality, bool recover=true ) : logSystem(logSystem), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0) {
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality, bool recover=true ) : logSystem(logSystem), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0), hasDiscardedData(false) {
if (enableRecovery) {
localityChanged = peekLocality ? peekLocality->onChange() : Never();
cursor = logSystem->peekTxs( UID(), 1, peekLocality ? peekLocality->get().primaryLocality : tagLocalityInvalid, peekLocality ? peekLocality->get().knownCommittedVersion : invalidVersion );