foundationdb/fdbserver/LogSystemPeekCursor.actor.cpp

1236 lines
44 KiB
C++

/*
* LogSystemPeekCursor.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/LogSystem.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbserver/Knobs.h"
#include "fdbrpc/ReplicationUtils.h"
#include "flow/actorcompiler.h" // has to be last include
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0),
returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void())
{
this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0;
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
}
ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, TagsAndMessage const& message, bool hasMsg, Version poppedVersion, Tag tag )
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageAndTags(message), hasMsg(hasMsg),
randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void())
{
//TraceEvent("SPC_Clone", randomID);
this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0;
if(hasMsg)
nextMessage();
advanceTo(messageVersion);
}
Reference<ILogSystem::IPeekCursor> ILogSystem::ServerPeekCursor::cloneNoMore() {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( results, messageVersion, end, messageAndTags, hasMsg, poppedVersion, tag ) );
}
void ILogSystem::ServerPeekCursor::setProtocolVersion( ProtocolVersion version ) {
rd.setProtocolVersion(version);
}
Arena& ILogSystem::ServerPeekCursor::arena() { return results.arena; }
ArenaReader* ILogSystem::ServerPeekCursor::reader() {
return &rd;
}
bool ILogSystem::ServerPeekCursor::hasMessage() {
//TraceEvent("SPC_HasMessage", randomID).detail("HasMsg", hasMsg);
return hasMsg;
}
void ILogSystem::ServerPeekCursor::nextMessage() {
//TraceEvent("SPC_NextMessage", randomID).detail("MessageVersion", messageVersion.toString());
ASSERT(hasMsg);
if (rd.empty()) {
messageVersion.reset(std::min(results.end, end.version));
hasMsg = false;
return;
}
if (*(int32_t*)rd.peekBytes(4) == VERSION_HEADER) {
// A version
int32_t dummy;
Version ver;
rd >> dummy >> ver;
//TraceEvent("SPC_ProcessSeq", randomID).detail("MessageVersion", messageVersion.toString()).detail("Ver", ver).detail("Tag", tag.toString());
//ASSERT( ver >= messageVersion.version );
messageVersion.reset(ver);
if( messageVersion >= end ) {
messageVersion = end;
hasMsg = false;
return;
}
ASSERT(!rd.empty());
}
messageAndTags.loadFromArena(&rd, &messageVersion.sub);
// Rewind and consume the header so that reader() starts from the message.
rd.rewind();
rd.readBytes(messageAndTags.getHeaderSize());
hasMsg = true;
//TraceEvent("SPC_NextMessageB", randomID).detail("MessageVersion", messageVersion.toString());
}
StringRef ILogSystem::ServerPeekCursor::getMessage() {
//TraceEvent("SPC_GetMessage", randomID);
StringRef message = messageAndTags.getMessageWithoutTags();
rd.readBytes(message.size()); // Consumes the message.
return message;
}
StringRef ILogSystem::ServerPeekCursor::getMessageWithTags() {
StringRef rawMessage = messageAndTags.getRawMessage();
rd.readBytes(rawMessage.size() - messageAndTags.getHeaderSize()); // Consumes the message.
return rawMessage;
}
VectorRef<Tag> ILogSystem::ServerPeekCursor::getTags() {
return messageAndTags.tags;
}
void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) {
//TraceEvent("SPC_AdvanceTo", randomID).detail("N", n.toString());
while( messageVersion < n && hasMessage() ) {
getMessage();
nextMessage();
}
if( hasMessage() )
return;
//if( more.isValid() && !more.isReady() ) more.cancel();
if( messageVersion < n ) {
messageVersion = n;
}
}
ACTOR Future<Void> resetChecker( ILogSystem::ServerPeekCursor* self, NetworkAddress addr ) {
self->slowReplies = 0;
self->unknownReplies = 0;
self->fastReplies = 0;
wait(delay(SERVER_KNOBS->PEEK_STATS_INTERVAL));
TraceEvent("SlowPeekStats").detail("SlowReplies", self->slowReplies).detail("FastReplies", self->fastReplies).detail("UnknownReplies", self->unknownReplies);
if(self->slowReplies >= SERVER_KNOBS->PEEK_STATS_SLOW_AMOUNT && self->slowReplies/double(self->slowReplies+self->fastReplies) >= SERVER_KNOBS->PEEK_STATS_SLOW_RATIO) {
FlowTransport::transport().resetConnection(addr);
self->lastReset = now();
}
return Void();
}
ACTOR Future<TLogPeekReply> recordRequestMetrics( ILogSystem::ServerPeekCursor* self, NetworkAddress addr, Future<TLogPeekReply> in ) {
try {
state double startTime = now();
TLogPeekReply t = wait(in);
if(now()-self->lastReset > SERVER_KNOBS->PEEK_RESET_INTERVAL) {
if(now()-startTime > SERVER_KNOBS->PEEK_MAX_LATENCY) {
if(t.messages.size() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES || SERVER_KNOBS->PEEK_COUNT_SMALL_MESSAGES) {
if(self->resetCheck.isReady()) {
self->resetCheck = resetChecker(self, addr);
}
self->slowReplies++;
} else {
self->unknownReplies++;
}
} else {
self->fastReplies++;
}
}
return t;
} catch (Error& e) {
if (e.code() != error_code_broken_promise)
throw;
wait(Never()); // never return
throw internal_error(); // does not happen
}
}
ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self, TaskPriority taskID ) {
if( !self->interf || self->messageVersion >= self->end ) {
if( self->hasMessage() )
return Void();
wait( Future<Void>(Never()));
throw internal_error();
}
if(!self->interfaceChanged.isValid()) {
self->interfaceChanged = self->interf->onChange();
}
loop {
state Version expectedBegin = self->messageVersion.version;
try {
if (self->parallelGetMore || self->onlySpilled) {
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( recordRequestMetrics( self, self->interf->get().interf().peekMessages.getEndpoint().getPrimaryAddress(), self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
}
if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) {
throw operation_obsolete();
}
} else if (self->futureResults.size() == 0) {
return Void();
}
if( self->hasMessage() )
return Void();
choose {
when( TLogPeekReply res = wait( self->interf->get().present() ? self->futureResults.front() : Never() ) ) {
if(res.begin.get() != expectedBegin) {
throw operation_obsolete();
}
expectedBegin = res.end;
self->futureResults.pop_front();
self->results = res;
self->onlySpilled = res.onlySpilled;
if(res.popped.present())
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() );
LogMessageVersion skipSeq = self->messageVersion;
self->hasMsg = true;
self->nextMessage();
self->advanceTo(skipSeq);
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void();
}
when( wait( self->interfaceChanged ) ) {
self->interfaceChanged = self->interf->onChange();
self->randomID = deterministicRandom()->randomUniqueID();
self->sequence = 0;
self->onlySpilled = false;
self->futureResults.clear();
}
}
} catch( Error &e ) {
if(e.code() == error_code_end_of_stream) {
self->end.reset( self->messageVersion.version );
return Void();
} else if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
TraceEvent("PeekCursorTimedOut", self->randomID).error(e);
// We *should* never get timed_out(), as it means the TLog got stuck while handling a parallel peek,
// and thus we've likely just wasted 10min.
// timed_out() is sent by cleanupPeekTrackers as value PEEK_TRACKER_EXPIRATION_TIME
ASSERT_WE_THINK(e.code() == error_code_operation_obsolete || SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME < 10);
self->interfaceChanged = self->interf->onChange();
self->randomID = deterministicRandom()->randomUniqueID();
self->sequence = 0;
self->futureResults.clear();
} else {
throw e;
}
}
}
}
ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self, TaskPriority taskID ) {
if( !self->interf || self->messageVersion >= self->end ) {
wait( Future<Void>(Never()));
throw internal_error();
}
try {
loop {
choose {
when( TLogPeekReply res = wait( self->interf->get().present() ?
brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled), taskID) ) : Never() ) ) {
self->results = res;
self->onlySpilled = res.onlySpilled;
if(res.popped.present())
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() );
LogMessageVersion skipSeq = self->messageVersion;
self->hasMsg = true;
self->nextMessage();
self->advanceTo(skipSeq);
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void();
}
when( wait( self->interf->onChange() ) ) {
self->onlySpilled = false;
}
}
}
} catch( Error &e ) {
if(e.code() == error_code_end_of_stream) {
self->end.reset( self->messageVersion.version );
return Void();
}
throw e;
}
}
Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
//TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString());
if( hasMessage() && !parallelGetMore )
return Void();
if( !more.isValid() || more.isReady() ) {
if (parallelGetMore || onlySpilled || futureResults.size()) {
more = serverPeekParallelGetMore(this, taskID);
} else {
more = serverPeekGetMore(this, taskID);
}
}
return more;
}
ACTOR Future<Void> serverPeekOnFailed( ILogSystem::ServerPeekCursor* self ) {
loop {
choose {
when( wait( self->interf->get().present() ? IFailureMonitor::failureMonitor().onStateEqual( self->interf->get().interf().peekMessages.getEndpoint(), FailureStatus() ) : Never() ) ) { return Void(); }
when( wait( self->interf->onChange() ) ) {}
}
}
}
Future<Void> ILogSystem::ServerPeekCursor::onFailed() {
return serverPeekOnFailed(this);
}
bool ILogSystem::ServerPeekCursor::isActive() {
if( !interf->get().present() )
return false;
if( messageVersion >= end )
return false;
return IFailureMonitor::failureMonitor().getState( interf->get().interf().peekMessages.getEndpoint() ).isAvailable();
}
bool ILogSystem::ServerPeekCursor::isExhausted() {
return messageVersion >= end;
}
const LogMessageVersion& ILogSystem::ServerPeekCursor::version() { return messageVersion; } // Call only after nextMessage(). The sequence of the current message, or results.end if nextMessage() has returned false.
Version ILogSystem::ServerPeekCursor::getMinKnownCommittedVersion() { return results.minKnownCommittedVersion; }
Optional<UID> ILogSystem::ServerPeekCursor::getPrimaryPeekLocation() {
if(interf) {
return interf->get().id();
}
return Optional<UID>();
}
Version ILogSystem::ServerPeekCursor::popped() { return poppedVersion; }
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin )
: serverCursors(serverCursors), bestServer(-1), readQuorum(serverCursors.size()), tag(invalidTag), currentCursor(0), hasNextMessage(false),
messageVersion(begin), randomID(deterministicRandom()->randomUniqueID()), tLogReplicationFactor(0) {
sortedVersions.resize(serverCursors.size());
}
ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end,
bool parallelGetMore, std::vector< LocalityData > const& tLogLocalities, Reference<IReplicationPolicy> const tLogPolicy, int tLogReplicationFactor )
: bestServer(bestServer), readQuorum(readQuorum), tag(tag), currentCursor(0), hasNextMessage(false), messageVersion(begin), randomID(deterministicRandom()->randomUniqueID()), tLogReplicationFactor(tLogReplicationFactor) {
if(tLogPolicy) {
logSet = Reference<LogSet>( new LogSet() );
logSet->tLogPolicy = tLogPolicy;
logSet->tLogLocalities = tLogLocalities;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
}
for( int i = 0; i < logServers.size(); i++ ) {
Reference<ILogSystem::ServerPeekCursor> cursor( new ILogSystem::ServerPeekCursor( logServers[i], tag, begin, end, bestServer >= 0, parallelGetMore ) );
//TraceEvent("MPC_Starting", randomID).detail("Cursor", cursor->randomID).detail("End", end);
serverCursors.push_back( cursor );
}
sortedVersions.resize(serverCursors.size());
}
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, Reference<LogSet> logSet, int tLogReplicationFactor )
: serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0), hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion), logSet(logSet),
randomID(deterministicRandom()->randomUniqueID()), tLogReplicationFactor(tLogReplicationFactor) {
sortedVersions.resize(serverCursors.size());
calcHasMessage();
}
Reference<ILogSystem::IPeekCursor> ILogSystem::MergedPeekCursor::cloneNoMore() {
vector< Reference<ILogSystem::IPeekCursor> > cursors;
for( auto it : serverCursors ) {
cursors.push_back(it->cloneNoMore());
}
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( cursors, messageVersion, bestServer, readQuorum, nextVersion, logSet, tLogReplicationFactor ) );
}
void ILogSystem::MergedPeekCursor::setProtocolVersion( ProtocolVersion version ) {
for( auto it : serverCursors )
if( it->hasMessage() )
it->setProtocolVersion( version );
}
Arena& ILogSystem::MergedPeekCursor::arena() { return serverCursors[currentCursor]->arena(); }
ArenaReader* ILogSystem::MergedPeekCursor::reader() { return serverCursors[currentCursor]->reader(); }
void ILogSystem::MergedPeekCursor::calcHasMessage() {
if(bestServer >= 0) {
if(nextVersion.present()) serverCursors[bestServer]->advanceTo( nextVersion.get() );
if( serverCursors[bestServer]->hasMessage() ) {
messageVersion = serverCursors[bestServer]->version();
currentCursor = bestServer;
hasNextMessage = true;
for (auto& c : serverCursors)
c->advanceTo(messageVersion);
return;
}
auto bestVersion = serverCursors[bestServer]->version();
for (auto& c : serverCursors)
c->advanceTo(bestVersion);
}
hasNextMessage = false;
updateMessage(false);
if(!hasNextMessage && logSet) {
updateMessage(true);
}
}
void ILogSystem::MergedPeekCursor::updateMessage(bool usePolicy) {
loop {
bool advancedPast = false;
sortedVersions.clear();
for(int i = 0; i < serverCursors.size(); i++) {
auto& serverCursor = serverCursors[i];
if (nextVersion.present()) serverCursor->advanceTo(nextVersion.get());
sortedVersions.push_back(std::pair<LogMessageVersion, int>(serverCursor->version(), i));
}
if(usePolicy) {
ASSERT(logSet->tLogPolicy);
std::sort(sortedVersions.begin(), sortedVersions.end());
locations.clear();
for(auto sortedVersion : sortedVersions) {
locations.push_back(logSet->logEntryArray[sortedVersion.second]);
if( locations.size() >= tLogReplicationFactor && logSet->satisfiesPolicy(locations) ) {
messageVersion = sortedVersion.first;
break;
}
}
} else {
std::nth_element(sortedVersions.begin(), sortedVersions.end()-readQuorum, sortedVersions.end());
messageVersion = sortedVersions[sortedVersions.size()-readQuorum].first;
}
for(int i = 0; i < serverCursors.size(); i++) {
auto& c = serverCursors[i];
auto start = c->version();
c->advanceTo(messageVersion);
if( start <= messageVersion && messageVersion < c->version() ) {
advancedPast = true;
TEST(true); //Merge peek cursor advanced past desired sequence
}
}
if(!advancedPast)
break;
}
for(int i = 0; i < serverCursors.size(); i++) {
auto& c = serverCursors[i];
ASSERT_WE_THINK( !c->hasMessage() || c->version() >= messageVersion ); // Seems like the loop above makes this unconditionally true
if (c->version() == messageVersion && c->hasMessage()) {
hasNextMessage = true;
currentCursor = i;
break;
}
}
}
bool ILogSystem::MergedPeekCursor::hasMessage() {
return hasNextMessage;
}
void ILogSystem::MergedPeekCursor::nextMessage() {
nextVersion = version();
nextVersion.get().sub++;
serverCursors[currentCursor]->nextMessage();
calcHasMessage();
ASSERT(hasMessage() || !version().sub);
}
StringRef ILogSystem::MergedPeekCursor::getMessage() { return serverCursors[currentCursor]->getMessage(); }
StringRef ILogSystem::MergedPeekCursor::getMessageWithTags() {
return serverCursors[currentCursor]->getMessageWithTags();
}
VectorRef<Tag> ILogSystem::MergedPeekCursor::getTags() {
return serverCursors[currentCursor]->getTags();
}
void ILogSystem::MergedPeekCursor::advanceTo(LogMessageVersion n) {
bool canChange = false;
for (auto& c : serverCursors) {
if(c->version() < n) {
canChange = true;
c->advanceTo(n);
}
}
if(canChange) {
calcHasMessage();
}
}
ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMessageVersion startVersion, TaskPriority taskID) {
loop {
//TraceEvent("MPC_GetMoreA", self->randomID).detail("Start", startVersion.toString());
if(self->bestServer >= 0 && self->serverCursors[self->bestServer]->isActive()) {
ASSERT(!self->serverCursors[self->bestServer]->hasMessage());
wait( self->serverCursors[self->bestServer]->getMore(taskID) || self->serverCursors[self->bestServer]->onFailed() );
} else {
vector<Future<Void>> q;
for (auto& c : self->serverCursors)
if (!c->hasMessage())
q.push_back(c->getMore(taskID));
wait(quorum(q, 1));
}
self->calcHasMessage();
//TraceEvent("MPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString());
if (self->hasMessage() || self->version() > startVersion) {
return Void();
}
}
}
Future<Void> ILogSystem::MergedPeekCursor::getMore(TaskPriority taskID) {
if( more.isValid() && !more.isReady() ) {
return more;
}
if(!serverCursors.size())
return Never();
auto startVersion = version();
calcHasMessage();
if( hasMessage() )
return Void();
if (nextVersion.present())
advanceTo(nextVersion.get());
ASSERT(!hasMessage());
if (version() > startVersion)
return Void();
more = mergedPeekGetMore(this, startVersion, taskID);
return more;
}
Future<Void> ILogSystem::MergedPeekCursor::onFailed() {
ASSERT(false);
return Never();
}
bool ILogSystem::MergedPeekCursor::isActive() {
ASSERT(false);
return false;
}
bool ILogSystem::MergedPeekCursor::isExhausted() {
return serverCursors[currentCursor]->isExhausted();
}
const LogMessageVersion& ILogSystem::MergedPeekCursor::version() { return messageVersion; }
Version ILogSystem::MergedPeekCursor::getMinKnownCommittedVersion() {
return serverCursors[currentCursor]->getMinKnownCommittedVersion();
}
Optional<UID> ILogSystem::MergedPeekCursor::getPrimaryPeekLocation() {
if(bestServer >= 0) {
return serverCursors[bestServer]->getPrimaryPeekLocation();
}
return Optional<UID>();
}
Version ILogSystem::MergedPeekCursor::popped() {
Version poppedVersion = 0;
for (auto& c : serverCursors)
poppedVersion = std::max(poppedVersion, c->popped());
return poppedVersion;
}
ILogSystem::SetPeekCursor::SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore )
: logSets(logSets), bestSet(bestSet), bestServer(bestServer), tag(tag), currentCursor(0), currentSet(bestSet), hasNextMessage(false), messageVersion(begin), useBestSet(true), randomID(deterministicRandom()->randomUniqueID()) {
serverCursors.resize(logSets.size());
int maxServers = 0;
for( int i = 0; i < logSets.size(); i++ ) {
for( int j = 0; j < logSets[i]->logServers.size(); j++) {
Reference<ILogSystem::ServerPeekCursor> cursor( new ILogSystem::ServerPeekCursor( logSets[i]->logServers[j], tag, begin, end, true, parallelGetMore ) );
serverCursors[i].push_back( cursor );
}
maxServers = std::max<int>(maxServers, serverCursors[i].size());
}
sortedVersions.resize(maxServers);
}
ILogSystem::SetPeekCursor::SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, std::vector< std::vector< Reference<IPeekCursor> > > const& serverCursors, LogMessageVersion const& messageVersion, int bestSet, int bestServer,
Optional<LogMessageVersion> nextVersion, bool useBestSet ) : logSets(logSets), serverCursors(serverCursors), messageVersion(messageVersion), bestSet(bestSet), bestServer(bestServer), nextVersion(nextVersion), currentSet(bestSet), currentCursor(0),
hasNextMessage(false), useBestSet(useBestSet), randomID(deterministicRandom()->randomUniqueID()) {
int maxServers = 0;
for( int i = 0; i < logSets.size(); i++ ) {
maxServers = std::max<int>(maxServers, serverCursors[i].size());
}
sortedVersions.resize(maxServers);
calcHasMessage();
}
Reference<ILogSystem::IPeekCursor> ILogSystem::SetPeekCursor::cloneNoMore() {
vector< vector< Reference<ILogSystem::IPeekCursor> > > cursors;
cursors.resize(logSets.size());
for( int i = 0; i < logSets.size(); i++ ) {
for( int j = 0; j < logSets[i]->logServers.size(); j++) {
cursors[i].push_back( serverCursors[i][j]->cloneNoMore() );
}
}
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( logSets, cursors, messageVersion, bestSet, bestServer, nextVersion, useBestSet ) );
}
void ILogSystem::SetPeekCursor::setProtocolVersion( ProtocolVersion version ) {
for( auto& cursors : serverCursors ) {
for( auto& it : cursors ) {
if( it->hasMessage() ) {
it->setProtocolVersion( version );
}
}
}
}
Arena& ILogSystem::SetPeekCursor::arena() { return serverCursors[currentSet][currentCursor]->arena(); }
ArenaReader* ILogSystem::SetPeekCursor::reader() { return serverCursors[currentSet][currentCursor]->reader(); }
void ILogSystem::SetPeekCursor::calcHasMessage() {
if(bestSet >= 0 && bestServer >= 0) {
if(nextVersion.present()) {
//TraceEvent("LPC_CalcNext").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("NextVersion", nextVersion.get().toString());
serverCursors[bestSet][bestServer]->advanceTo( nextVersion.get() );
}
if( serverCursors[bestSet][bestServer]->hasMessage() ) {
messageVersion = serverCursors[bestSet][bestServer]->version();
currentSet = bestSet;
currentCursor = bestServer;
hasNextMessage = true;
//TraceEvent("LPC_Calc1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
for (auto& cursors : serverCursors) {
for(auto& c : cursors) {
c->advanceTo(messageVersion);
}
}
return;
}
auto bestVersion = serverCursors[bestSet][bestServer]->version();
for (auto& cursors : serverCursors) {
for (auto& c : cursors) {
c->advanceTo(bestVersion);
}
}
}
hasNextMessage = false;
if(useBestSet) {
updateMessage(bestSet, false); // Use Quorum logic
//TraceEvent("LPC_Calc2").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
if(!hasNextMessage) {
updateMessage(bestSet, true);
//TraceEvent("LPC_Calc3").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
}
} else {
for(int i = 0; i < logSets.size() && !hasNextMessage; i++) {
if(i != bestSet) {
updateMessage(i, false); // Use Quorum logic
}
}
//TraceEvent("LPC_Calc4").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
for(int i = 0; i < logSets.size() && !hasNextMessage; i++) {
if(i != bestSet) {
updateMessage(i, true);
}
}
//TraceEvent("LPC_Calc5").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
}
}
void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) {
loop {
bool advancedPast = false;
sortedVersions.clear();
for(int i = 0; i < serverCursors[logIdx].size(); i++) {
auto& serverCursor = serverCursors[logIdx][i];
if (nextVersion.present()) serverCursor->advanceTo(nextVersion.get());
sortedVersions.push_back(std::pair<LogMessageVersion, int>(serverCursor->version(), i));
//TraceEvent("LPC_Update1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("ServerVer", serverCursor->version().toString()).detail("I", i);
}
if(usePolicy) {
std::sort(sortedVersions.begin(), sortedVersions.end());
locations.clear();
for(auto sortedVersion : sortedVersions) {
locations.push_back(logSets[logIdx]->logEntryArray[sortedVersion.second]);
if( locations.size() >= logSets[logIdx]->tLogReplicationFactor && logSets[logIdx]->satisfiesPolicy(locations) ) {
messageVersion = sortedVersion.first;
break;
}
}
} else {
//(int)oldLogData[i].logServers.size() + 1 - oldLogData[i].tLogReplicationFactor
std::nth_element(sortedVersions.begin(), sortedVersions.end()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor), sortedVersions.end());
messageVersion = sortedVersions[sortedVersions.size()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor)].first;
}
for (auto& cursors : serverCursors) {
for (auto& c : cursors) {
auto start = c->version();
c->advanceTo(messageVersion);
if( start <= messageVersion && messageVersion < c->version() ) {
advancedPast = true;
TEST(true); //Merge peek cursor advanced past desired sequence
}
}
}
if(!advancedPast)
break;
}
for(int i = 0; i < serverCursors[logIdx].size(); i++) {
auto& c = serverCursors[logIdx][i];
ASSERT_WE_THINK( !c->hasMessage() || c->version() >= messageVersion ); // Seems like the loop above makes this unconditionally true
if (c->version() == messageVersion && c->hasMessage()) {
hasNextMessage = true;
currentSet = logIdx;
currentCursor = i;
break;
}
}
}
bool ILogSystem::SetPeekCursor::hasMessage() {
return hasNextMessage;
}
void ILogSystem::SetPeekCursor::nextMessage() {
nextVersion = version();
nextVersion.get().sub++;
serverCursors[currentSet][currentCursor]->nextMessage();
calcHasMessage();
ASSERT(hasMessage() || !version().sub);
}
StringRef ILogSystem::SetPeekCursor::getMessage() { return serverCursors[currentSet][currentCursor]->getMessage(); }
StringRef ILogSystem::SetPeekCursor::getMessageWithTags() { return serverCursors[currentSet][currentCursor]->getMessageWithTags(); }
VectorRef<Tag> ILogSystem::SetPeekCursor::getTags() {
return serverCursors[currentSet][currentCursor]->getTags();
}
void ILogSystem::SetPeekCursor::advanceTo(LogMessageVersion n) {
bool canChange = false;
for( auto& cursors : serverCursors ) {
for (auto& c : cursors) {
if(c->version() < n) {
canChange = true;
c->advanceTo(n);
}
}
}
if(canChange) {
calcHasMessage();
}
}
ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVersion startVersion, TaskPriority taskID) {
loop {
//TraceEvent("LPC_GetMore1", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
if(self->bestServer >= 0 && self->bestSet >= 0 && self->serverCursors[self->bestSet][self->bestServer]->isActive()) {
ASSERT(!self->serverCursors[self->bestSet][self->bestServer]->hasMessage());
//TraceEvent("LPC_GetMore2", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
wait( self->serverCursors[self->bestSet][self->bestServer]->getMore(taskID) || self->serverCursors[self->bestSet][self->bestServer]->onFailed() );
self->useBestSet = true;
} else {
//FIXME: if best set is exhausted, do not peek remote servers
bool bestSetValid = self->bestSet >= 0;
if(bestSetValid) {
self->locations.clear();
for( int i = 0; i < self->serverCursors[self->bestSet].size(); i++) {
if(!self->serverCursors[self->bestSet][i]->isActive() && self->serverCursors[self->bestSet][i]->version() <= self->messageVersion) {
self->locations.push_back(self->logSets[self->bestSet]->logEntryArray[i]);
}
}
bestSetValid = self->locations.size() < self->logSets[self->bestSet]->tLogReplicationFactor || !self->logSets[self->bestSet]->satisfiesPolicy(self->locations);
}
if(bestSetValid || self->logSets.size() == 1) {
if(!self->useBestSet) {
self->useBestSet = true;
self->calcHasMessage();
if (self->hasMessage() || self->version() > startVersion)
return Void();
}
//TraceEvent("LPC_GetMore3", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag.toString()).detail("BestSetSize", self->serverCursors[self->bestSet].size());
vector<Future<Void>> q;
for (auto& c : self->serverCursors[self->bestSet]) {
if (!c->hasMessage()) {
q.push_back(c->getMore(taskID));
if(c->isActive()) {
q.push_back(c->onFailed());
}
}
}
wait(quorum(q, 1));
} else {
//FIXME: this will peeking way too many cursors when satellites exist, and does not need to peek bestSet cursors since we cannot get anymore data from them
vector<Future<Void>> q;
//TraceEvent("LPC_GetMore4", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
for(auto& cursors : self->serverCursors) {
for (auto& c :cursors) {
if (!c->hasMessage()) {
q.push_back(c->getMore(taskID));
}
}
}
wait(quorum(q, 1));
self->useBestSet = false;
}
}
self->calcHasMessage();
//TraceEvent("LPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString());
if (self->hasMessage() || self->version() > startVersion)
return Void();
}
}
Future<Void> ILogSystem::SetPeekCursor::getMore(TaskPriority taskID) {
if( more.isValid() && !more.isReady() ) {
return more;
}
auto startVersion = version();
calcHasMessage();
if( hasMessage() )
return Void();
if (nextVersion.present())
advanceTo(nextVersion.get());
ASSERT(!hasMessage());
if (version() > startVersion)
return Void();
more = setPeekGetMore(this, startVersion, taskID);
return more;
}
Future<Void> ILogSystem::SetPeekCursor::onFailed() {
ASSERT(false);
return Never();
}
bool ILogSystem::SetPeekCursor::isActive() {
ASSERT(false);
return false;
}
bool ILogSystem::SetPeekCursor::isExhausted() {
return serverCursors[currentSet][currentCursor]->isExhausted();
}
const LogMessageVersion& ILogSystem::SetPeekCursor::version() { return messageVersion; }
Version ILogSystem::SetPeekCursor::getMinKnownCommittedVersion() {
return serverCursors[currentSet][currentCursor]->getMinKnownCommittedVersion();
}
Optional<UID> ILogSystem::SetPeekCursor::getPrimaryPeekLocation() {
if(bestServer >= 0 && bestSet >= 0) {
return serverCursors[bestSet][bestServer]->getPrimaryPeekLocation();
}
return Optional<UID>();
}
Version ILogSystem::SetPeekCursor::popped() {
Version poppedVersion = 0;
for (auto& cursors : serverCursors) {
for(auto& c : cursors) {
poppedVersion = std::max(poppedVersion, c->popped());
}
}
return poppedVersion;
}
ILogSystem::MultiCursor::MultiCursor( std::vector<Reference<IPeekCursor>> cursors, std::vector<LogMessageVersion> epochEnds ) : cursors(cursors), epochEnds(epochEnds), poppedVersion(0) {
for(int i = 0; i < std::min<int>(cursors.size(),SERVER_KNOBS->MULTI_CURSOR_PRE_FETCH_LIMIT); i++) {
cursors[cursors.size()-i-1]->getMore();
}
}
Reference<ILogSystem::IPeekCursor> ILogSystem::MultiCursor::cloneNoMore() {
return cursors.back()->cloneNoMore();
}
void ILogSystem::MultiCursor::setProtocolVersion( ProtocolVersion version ) {
cursors.back()->setProtocolVersion(version);
}
Arena& ILogSystem::MultiCursor::arena() {
return cursors.back()->arena();
}
ArenaReader* ILogSystem::MultiCursor::reader() {
return cursors.back()->reader();
}
bool ILogSystem::MultiCursor::hasMessage() {
return cursors.back()->hasMessage();
}
void ILogSystem::MultiCursor::nextMessage() {
cursors.back()->nextMessage();
}
StringRef ILogSystem::MultiCursor::getMessage() {
return cursors.back()->getMessage();
}
StringRef ILogSystem::MultiCursor::getMessageWithTags() {
return cursors.back()->getMessageWithTags();
}
VectorRef<Tag> ILogSystem::MultiCursor::getTags() {
return cursors.back()->getTags();
}
void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) {
while( cursors.size() > 1 && n >= epochEnds.back() ) {
poppedVersion = std::max(poppedVersion, cursors.back()->popped());
cursors.pop_back();
epochEnds.pop_back();
}
cursors.back()->advanceTo(n);
}
Future<Void> ILogSystem::MultiCursor::getMore(TaskPriority taskID) {
LogMessageVersion startVersion = cursors.back()->version();
while( cursors.size() > 1 && cursors.back()->version() >= epochEnds.back() ) {
poppedVersion = std::max(poppedVersion, cursors.back()->popped());
cursors.pop_back();
epochEnds.pop_back();
}
if(cursors.back()->version() > startVersion) {
return Void();
}
return cursors.back()->getMore(taskID);
}
Future<Void> ILogSystem::MultiCursor::onFailed() {
return cursors.back()->onFailed();
}
bool ILogSystem::MultiCursor::isActive() {
return cursors.back()->isActive();
}
bool ILogSystem::MultiCursor::isExhausted() {
return cursors.back()->isExhausted();
}
const LogMessageVersion& ILogSystem::MultiCursor::version() {
return cursors.back()->version();
}
Version ILogSystem::MultiCursor::getMinKnownCommittedVersion() {
return cursors.back()->getMinKnownCommittedVersion();
}
Optional<UID> ILogSystem::MultiCursor::getPrimaryPeekLocation() {
return cursors.back()->getPrimaryPeekLocation();
}
Version ILogSystem::MultiCursor::popped() {
return std::max(poppedVersion, cursors.back()->popped());
}
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(canDiscardPopped), knownUnique(false), minKnownCommittedVersion(0), randomID(deterministicRandom()->randomUniqueID()) {
targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/cursors.size();
messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES);
cursorMessages.resize(cursors.size());
}
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, Tag tag, Version begin, Version end, bool parallelGetMore ) : messageVersion(begin), end(end), withTags(true), collectTags(false), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(false), knownUnique(true), minKnownCommittedVersion(0), randomID(deterministicRandom()->randomUniqueID()) {
targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/logServers.size();
messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES);
cursorMessages.resize(logServers.size());
for( int i = 0; i < logServers.size(); i++ ) {
Reference<ILogSystem::ServerPeekCursor> cursor( new ILogSystem::ServerPeekCursor( logServers[i], tag, begin, end, false, parallelGetMore ) );
cursors.push_back( cursor );
}
}
void ILogSystem::BufferedCursor::combineMessages() {
if(!hasNextMessage) {
return;
}
std::vector<Tag> tags;
tags.push_back(messages[messageIndex].tags[0]);
for(int i = messageIndex + 1; i < messages.size() && messages[messageIndex].version == messages[i].version; i++) {
tags.push_back(messages[i].tags[0]);
messageIndex = i;
}
auto& msg = messages[messageIndex];
BinaryWriter messageWriter(Unversioned());
messageWriter << uint32_t(msg.message.size() + sizeof(uint32_t) + sizeof(uint16_t) + tags.size()*sizeof(Tag)) << msg.version.sub << uint16_t(tags.size());
for(auto t : tags) {
messageWriter << t;
}
messageWriter.serializeBytes(msg.message);
Standalone<StringRef> val = messageWriter.toValue();
msg.arena = val.arena();
msg.message = val;
msg.tags = VectorRef<Tag>();
for(auto t : tags) {
msg.tags.push_back(msg.arena, t);
}
}
Reference<ILogSystem::IPeekCursor> ILogSystem::BufferedCursor::cloneNoMore() {
ASSERT(false);
return Reference<ILogSystem::IPeekCursor>();
}
void ILogSystem::BufferedCursor::setProtocolVersion( ProtocolVersion version ) {
for(auto& c : cursors) {
c->setProtocolVersion(version);
}
}
Arena& ILogSystem::BufferedCursor::arena() {
return messages[messageIndex].arena;
}
ArenaReader* ILogSystem::BufferedCursor::reader() {
ASSERT(false);
return cursors[0]->reader();
}
bool ILogSystem::BufferedCursor::hasMessage() {
return hasNextMessage;
}
void ILogSystem::BufferedCursor::nextMessage() {
messageIndex++;
if(messageIndex == messages.size()) {
hasNextMessage = false;
}
if(collectTags) {
combineMessages();
}
}
StringRef ILogSystem::BufferedCursor::getMessage() {
ASSERT(!withTags);
return messages[messageIndex].message;
}
StringRef ILogSystem::BufferedCursor::getMessageWithTags() {
ASSERT(withTags);
return messages[messageIndex].message;
}
VectorRef<Tag> ILogSystem::BufferedCursor::getTags() {
ASSERT(withTags);
return messages[messageIndex].tags;
}
void ILogSystem::BufferedCursor::advanceTo(LogMessageVersion n) {
ASSERT(false);
}
ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference<ILogSystem::IPeekCursor> cursor, int idx, TaskPriority taskID ) {
loop {
wait(yield());
if(cursor->version().version >= self->end || self->cursorMessages[idx].size() > self->targetQueueSize) {
return Void();
}
wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, cursor->getMinKnownCommittedVersion());
if(self->canDiscardPopped) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
}
if(cursor->version().version >= self->end) {
return Void();
}
while(cursor->hasMessage()) {
self->cursorMessages[idx].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? VectorRef<Tag>() : cursor->getTags(), cursor->version()));
cursor->nextMessage();
}
}
}
ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriority taskID ) {
if( self->messageVersion.version >= self->end ) {
wait( Future<Void>(Never()));
throw internal_error();
}
self->messages.clear();
std::vector<Future<Void>> loaders;
loaders.reserve(self->cursors.size());
for(int i = 0; i < self->cursors.size(); i++) {
loaders.push_back(bufferedGetMoreLoader(self, self->cursors[i], i, taskID));
}
state Future<Void> allLoaders = waitForAll(loaders);
state Version minVersion;
loop {
wait( allLoaders || delay(SERVER_KNOBS->DESIRED_GET_MORE_DELAY, taskID) );
minVersion = self->end;
for(int i = 0; i < self->cursors.size(); i++) {
auto cursor = self->cursors[i];
while(cursor->hasMessage()) {
self->cursorMessages[i].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? VectorRef<Tag>() : cursor->getTags(), cursor->version()));
cursor->nextMessage();
}
minVersion = std::min(minVersion, cursor->version().version);
}
if(minVersion > self->messageVersion.version) {
break;
}
if(allLoaders.isReady()) {
wait(Future<Void>(Never()));
}
}
wait( yield() );
for(auto &it : self->cursorMessages) {
while(!it.empty() && it.front().version.version < minVersion) {
self->messages.push_back(it.front());
it.pop_front();
}
}
if(self->collectTags || self->knownUnique) {
std::sort(self->messages.begin(), self->messages.end());
} else {
uniquify(self->messages);
}
self->messageVersion = LogMessageVersion(minVersion);
self->messageIndex = 0;
self->hasNextMessage = self->messages.size() > 0;
if(self->collectTags) {
self->combineMessages();
}
wait(yield());
if(self->canDiscardPopped && self->poppedVersion > self->version().version) {
TraceEvent(SevWarn, "DiscardingPoppedData", self->randomID).detail("Version", self->version().version).detail("Popped", self->poppedVersion);
self->messageVersion = std::max(self->messageVersion, LogMessageVersion(self->poppedVersion));
for(auto cursor : self->cursors) {
cursor->advanceTo(self->messageVersion);
}
self->messageIndex = self->messages.size();
if (self->messages.size() > 0 && self->messages[self->messages.size()-1].version.version < self->poppedVersion) {
self->hasNextMessage = false;
} else {
auto iter = std::lower_bound(self->messages.begin(), self->messages.end(),
ILogSystem::BufferedCursor::BufferedMessage(self->poppedVersion));
self->hasNextMessage = iter != self->messages.end();
if(self->hasNextMessage) {
self->messageIndex = iter - self->messages.begin();
}
}
}
if(self->hasNextMessage) {
self->canDiscardPopped = false;
}
return Void();
}
Future<Void> ILogSystem::BufferedCursor::getMore(TaskPriority taskID) {
if( hasMessage() ) {
return Void();
}
if( !more.isValid() || more.isReady() ) {
more = bufferedGetMore(this, taskID);
}
return more;
}
Future<Void> ILogSystem::BufferedCursor::onFailed() {
ASSERT(false);
return Never();
}
bool ILogSystem::BufferedCursor::isActive() {
ASSERT(false);
return false;
}
bool ILogSystem::BufferedCursor::isExhausted() {
ASSERT(false);
return false;
}
const LogMessageVersion& ILogSystem::BufferedCursor::version() {
if(hasNextMessage) {
return messages[messageIndex].version;
}
return messageVersion;
}
Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() {
return minKnownCommittedVersion;
}
Optional<UID> ILogSystem::BufferedCursor::getPrimaryPeekLocation() {
return Optional<UID>();
}
Version ILogSystem::BufferedCursor::popped() {
if(initialPoppedVersion == poppedVersion) {
return 0;
}
return poppedVersion;
}