fix: buffered cursor would start multiple bufferedGetMore actors

advance all of the cursors to the poppedVersion
This commit is contained in:
Evan Tschannen 2019-07-30 14:42:05 -07:00
parent b5cb7919b6
commit 7ac7eb82f2
2 changed files with 11 additions and 2 deletions

View File

@ -581,6 +581,7 @@ struct ILogSystem {
Version poppedVersion;
Version initialPoppedVersion;
bool canDiscardPopped;
Future<Void> more;
//FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade.
bool collectTags;

View File

@ -1038,6 +1038,9 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
if(self->canDiscardPopped && self->poppedVersion > self->version().version) {
TraceEvent(SevWarn, "DiscardingPoppedData").detail("Version", self->version().version).detail("Popped", self->poppedVersion);
self->messageVersion = std::max(self->messageVersion, LogMessageVersion(self->poppedVersion));
for(auto& cursor : self->cursors) {
cursor->advanceTo(self->messageVersion);
}
self->messageIndex = self->messages.size();
if (self->messages.size() > 0 && self->messages[self->messages.size()-1].version < self->messageVersion) {
self->hasNextMessage = false;
@ -1057,9 +1060,14 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
}
Future<Void> ILogSystem::BufferedCursor::getMore(TaskPriority taskID) {
if( hasMessage() )
if( hasMessage() ) {
return Void();
return bufferedGetMore(this, taskID);
}
if( !more.isValid() || more.isReady() ) {
more = bufferedGetMore(this, taskID);
}
return more;
}
Future<Void> ILogSystem::BufferedCursor::onFailed() {