implement popped on bufferedCursor

This commit is contained in:
Evan Tschannen 2019-07-29 21:19:47 -07:00
parent d8b14fe372
commit 5bb322b483
3 changed files with 29 additions and 12 deletions

View File

@ -521,9 +521,8 @@ struct ILogSystem {
std::vector<Reference<IPeekCursor>> cursors;
std::vector<LogMessageVersion> epochEnds;
Version poppedVersion;
bool needsPopped;
MultiCursor( std::vector<Reference<IPeekCursor>> cursors, std::vector<LogMessageVersion> epochEnds, bool needsPopped = true );
MultiCursor( std::vector<Reference<IPeekCursor>> cursors, std::vector<LogMessageVersion> epochEnds );
virtual Reference<IPeekCursor> cloneNoMore();
virtual void setProtocolVersion( ProtocolVersion version );
@ -578,6 +577,9 @@ struct ILogSystem {
Version end;
bool hasNextMessage;
bool withTags;
Version poppedVersion;
Version initialPoppedVersion;
bool hasReturnedData;
//FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade.
bool collectTags;
@ -781,7 +783,7 @@ struct LogPushData : NonCopyable {
next_message_tags.insert(next_message_tags.end(), tags.begin(), tags.end());
}
void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations = false ) {
void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations, Version commitVersion ) {
if( !usePreviousLocations ) {
prev_tags.clear();
if(logSystem->hasRemoteLogs()) {
@ -795,12 +797,14 @@ struct LogPushData : NonCopyable {
next_message_tags.clear();
}
uint32_t subseq = this->subsequence++;
uint32_t msgsize = rawMessageWithoutLength.size() + sizeof(subseq) + sizeof(uint16_t) + sizeof(Tag)*prev_tags.size();
for(int loc : msg_locations) {
messagesWriter[loc] << uint32_t(rawMessageWithoutLength.size() + sizeof(subseq) + sizeof(uint16_t) + sizeof(Tag)*prev_tags.size()) << subseq << uint16_t(prev_tags.size());
messagesWriter[loc] << msgsize << subseq << uint16_t(prev_tags.size());
for(auto& tag : prev_tags)
messagesWriter[loc] << tag;
messagesWriter[loc].serializeBytes(rawMessageWithoutLength);
}
TraceEvent("AddMessage").detail("Tags", describe(prev_tags)).detail("Version", commitVersion).detail("Subseq", subseq).detail("MsgSize", msgsize);
}
template <class T>

View File

@ -810,7 +810,7 @@ Version ILogSystem::SetPeekCursor::popped() {
return poppedVersion;
}
ILogSystem::MultiCursor::MultiCursor( std::vector<Reference<IPeekCursor>> cursors, std::vector<LogMessageVersion> epochEnds, bool needsPopped ) : cursors(cursors), epochEnds(epochEnds), needsPopped(needsPopped), poppedVersion(0) {
ILogSystem::MultiCursor::MultiCursor( std::vector<Reference<IPeekCursor>> cursors, std::vector<LogMessageVersion> epochEnds ) : cursors(cursors), epochEnds(epochEnds), poppedVersion(0) {
for(int i = 0; i < std::min<int>(cursors.size(),SERVER_KNOBS->MULTI_CURSOR_PRE_FETCH_LIMIT); i++) {
cursors[cursors.size()-i-1]->getMore();
}
@ -854,7 +854,7 @@ const std::vector<Tag>& ILogSystem::MultiCursor::getTags() {
void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) {
while( cursors.size() > 1 && n >= epochEnds.back() ) {
if(needsPopped) poppedVersion = std::max(poppedVersion, cursors.back()->popped());
poppedVersion = std::max(poppedVersion, cursors.back()->popped());
cursors.pop_back();
epochEnds.pop_back();
}
@ -864,7 +864,7 @@ void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) {
Future<Void> ILogSystem::MultiCursor::getMore(TaskPriority taskID) {
LogMessageVersion startVersion = cursors.back()->version();
while( cursors.size() > 1 && cursors.back()->version() >= epochEnds.back() ) {
if(needsPopped) poppedVersion = std::max(poppedVersion, cursors.back()->popped());
poppedVersion = std::max(poppedVersion, cursors.back()->popped());
cursors.pop_back();
epochEnds.pop_back();
}
@ -895,11 +895,10 @@ Version ILogSystem::MultiCursor::getMinKnownCommittedVersion() {
}
Version ILogSystem::MultiCursor::popped() {
ASSERT(needsPopped);
return std::max(poppedVersion, cursors.back()->popped());
}
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0) {
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), hasReturnedData(false) {
messages.reserve(10000);
}
@ -994,6 +993,10 @@ ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Refe
}
}
wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
if(!self->hasReturnedData) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
}
}
}
@ -1032,6 +1035,14 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
}
wait(yield());
if(!self->hasReturnedData) {
while(self->hasNextMessage && self->version().version < self->poppedVersion) {
self->nextMessage();
}
if(self->hasNextMessage) {
self->hasReturnedData = true;
}
}
return Void();
}
@ -1069,6 +1080,8 @@ Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() {
}
Version ILogSystem::BufferedCursor::popped() {
ASSERT(false);
return invalidVersion;
if(initialPoppedVersion == poppedVersion) {
return 0;
}
return poppedVersion;
}

View File

@ -807,7 +807,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
cursors[0] = Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(allCursors, localEnd, end, false) );
epochEnds.emplace_back(localEnd);
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds, false) );
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
} catch( Error& e ) {
if(e.code() == error_code_worker_removed) {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;