diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 538c41df96..53c9bc4785 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -341,7 +341,9 @@ ACTOR Future logRouterPop( LogRouterData* self, TLogPopRequest req ) { Void _ = wait(yield(TaskUpdateStorage)); } - self->logSystem->get()->pop(minPopped, self->routerTag); + if(self->logSystem->get()) { + self->logSystem->get()->pop(minPopped, self->routerTag); + } req.reply.send(Void()); return Void(); } @@ -362,7 +364,7 @@ ACTOR Future logRouterCore( loop choose { when( Void _ = wait( dbInfoChange ) ) { dbInfoChange = db->onChange(); - if( db->get().recoveryState >= RecoveryState::FULLY_RECOVERED && logSet < db->get().logSystemConfig.tLogs.size() && + if( db->get().recoveryState >= RecoveryState::FULLY_RECOVERED && logSet < db->get().logSystemConfig.tLogs.size() && std::count( db->get().logSystemConfig.tLogs[logSet].logRouters.begin(), db->get().logSystemConfig.tLogs[logSet].logRouters.end(), interf.id() ) ) { logRouterData.logSystem->set(ILogSystem::fromServerDBInfo( logRouterData.dbgid, db->get() )); } else { diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index d0109ba3a7..5b5612dfb7 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -189,6 +189,9 @@ struct ILogSystem { // (3) the cursor cannot return any more results virtual bool isActive() = 0; + //returns true if the cursor cannot return any more results + virtual bool isExhausted() = 0; + // Returns the smallest possible message version which the current message (if any) or a subsequent message might have // (If hasMessage(), this is therefore the message version of the current message) virtual LogMessageVersion version() = 0; @@ -252,6 +255,8 @@ struct ILogSystem { virtual bool isActive(); + virtual bool isExhausted(); + virtual LogMessageVersion version(); virtual Version popped(); @@ -313,6 +318,8 @@ struct ILogSystem { virtual bool isActive(); + virtual bool isExhausted(); + virtual LogMessageVersion version(); virtual Version popped(); @@ -369,6 +376,8 @@ struct ILogSystem { virtual bool isActive(); + virtual bool isExhausted(); + virtual LogMessageVersion version(); virtual Version popped(); @@ -413,6 +422,8 @@ struct ILogSystem { virtual bool isActive(); + virtual bool isExhausted(); + virtual LogMessageVersion version(); virtual Version popped(); diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index be1e906914..f858ee9846 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -233,6 +233,10 @@ bool ILogSystem::ServerPeekCursor::isActive() { return IFailureMonitor::failureMonitor().getState( interf->get().interf().peekMessages.getEndpoint() ).isAvailable(); } +bool ILogSystem::ServerPeekCursor::isExhausted() { + return messageVersion >= end; +} + LogMessageVersion ILogSystem::ServerPeekCursor::version() { return messageVersion; } // Call only after nextMessage(). The sequence of the current message, or results.end if nextMessage() has returned false. Version ILogSystem::ServerPeekCursor::popped() { return poppedVersion; } @@ -356,6 +360,10 @@ void ILogSystem::MergedPeekCursor::advanceTo(LogMessageVersion n) { } ACTOR Future mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMessageVersion startVersion) { + if(self->bestServer >= 0 && self->serverCursors[self->bestServer]->isExhausted()) { + return Never(); + } + loop { //TraceEvent("MPC_getMoreA", self->randomID).detail("start", startVersion.toString()); if(self->bestServer >= 0 && self->serverCursors[self->bestServer]->isActive()) { @@ -402,6 +410,11 @@ bool ILogSystem::MergedPeekCursor::isActive() { return false; } +bool ILogSystem::MergedPeekCursor::isExhausted() { + ASSERT(false); + return false; +} + LogMessageVersion ILogSystem::MergedPeekCursor::version() { return messageVersion; } Version ILogSystem::MergedPeekCursor::popped() { @@ -469,8 +482,8 @@ void ILogSystem::SetPeekCursor::calcHasMessage() { } } + hasNextMessage = false; if(useBestSet) { - hasNextMessage = false; updateMessage(bestSet, false); // Use Quorum logic if(!hasNextMessage) { @@ -575,12 +588,17 @@ void ILogSystem::SetPeekCursor::advanceTo(LogMessageVersion n) { ACTOR Future setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVersion startVersion) { loop { - //TraceEvent("LPC_getMoreA", self->randomID).detail("start", startVersion.toString()); + if(self->bestServer >= 0 && self->bestSet >= 0 && self->serverCursors[self->bestSet][self->bestServer]->isExhausted()) { + return Never(); + } + + //TraceEvent("LPC_getMore1", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag); if(self->bestServer >= 0 && self->bestSet >= 0 && self->serverCursors[self->bestSet][self->bestServer]->isActive()) { ASSERT(!self->serverCursors[self->bestSet][self->bestServer]->hasMessage()); Void _ = wait( self->serverCursors[self->bestSet][self->bestServer]->getMore() || self->serverCursors[self->bestSet][self->bestServer]->onFailed() ); self->useBestSet = true; } else { + //FIXME: if best set is exhausted, do not peek remote servers bool bestSetValid = self->bestSet >= 0; if(bestSetValid) { self->localityGroup.clear(); @@ -645,6 +663,11 @@ bool ILogSystem::SetPeekCursor::isActive() { return false; } +bool ILogSystem::SetPeekCursor::isExhausted() { + ASSERT(false); + return false; +} + LogMessageVersion ILogSystem::SetPeekCursor::version() { return messageVersion; } Version ILogSystem::SetPeekCursor::popped() { @@ -717,6 +740,10 @@ bool ILogSystem::MultiCursor::isActive() { return cursors.back()->isActive(); } +bool ILogSystem::MultiCursor::isExhausted() { + return cursors.back()->isActive(); +} + LogMessageVersion ILogSystem::MultiCursor::version() { return cursors.back()->version(); } diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index de7fd4d89d..fcb2169c00 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -729,7 +729,9 @@ ACTOR Future commitBatch( // txnState (transaction subsystem state) tag: message extracted from log adapter bool firstMessage = true; for(auto m : msg.messages) { - toCommit.addTag(txsTag); + if(firstMessage) { + toCommit.addTag(txsTag); + } toCommit.addMessage(StringRef(m.begin(), m.size()), !firstMessage); firstMessage = false; } diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 4ea512dab1..2c83493443 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -853,7 +853,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted newRemoteEpoch( TagPartitionedLogSystem* self, vector remoteTLogWorkers, vector logRouterWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, Version recoveryVersion, Tag minTag, int logNum ) + ACTOR static Future newRemoteEpoch( TagPartitionedLogSystem* self, Reference oldLogSystem, vector remoteTLogWorkers, vector logRouterWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, Tag minTag, int logNum ) { //recruit temporary log routers and update registration with them state int tempLogRouters = std::max(logRouterWorkers.size(), SERVER_KNOBS->MIN_TAG - minTag + 1); @@ -883,8 +883,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedgetLogSystemConfig(); - req.recoverAt = recoveryVersion; + req.recoverFrom = oldLogSystem->getLogSystemConfig(); + req.recoverAt = oldLogSystem->epochEndVersion.get(); + req.knownCommittedVersion = oldLogSystem->knownCommittedVersion; req.epoch = recoveryCount; req.remoteTag = SERVER_KNOBS->MAX_TAG + i; } @@ -894,7 +895,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[logNum]->updateLocalitySet(remoteTLogWorkers); vector locations; - for( Tag tag : self->epochEndTags ) { + for( Tag tag : oldLogSystem->epochEndTags ) { locations.clear(); self->tLogs[logNum]->getPushLocations( vector(1, tag), locations, 0 ); for(int loc : locations) @@ -997,7 +998,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[0]->logServers.size(); i++) recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[0]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); logSystem->recoveryComplete = waitForAll(recoveryComplete); - logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), remoteTLogWorkers, logRouterWorkers, configuration, recoveryCount, oldLogSystem->epochEndVersion.get(), minTag, 1); + logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, remoteTLogWorkers, logRouterWorkers, configuration, recoveryCount, minTag, 1); Void _ = wait(logSystem->remoteRecovery); return logSystem;