From 85c315f6848d44fd596d5c580b834f347376c57d Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Fri, 1 Nov 2019 14:02:44 -0700 Subject: [PATCH 1/2] Fix: parallelPeekMore was not enabled when peeking from log routers --- fdbserver/LogRouter.actor.cpp | 3 +++ fdbserver/OldTLogServer_6_0.actor.cpp | 10 +++++----- fdbserver/TLogServer.actor.cpp | 10 +++++----- fdbserver/TagPartitionedLogSystem.actor.cpp | 12 ++++++------ 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 840227759c..53fa69b163 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -339,6 +339,9 @@ ACTOR Future logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r try { peekId = req.sequence.get().first; sequence = req.sequence.get().second; + if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) { + throw timed_out(); + } auto& trackerData = self->peekTracker[peekId]; if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled)); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 12a5bd6d94..10626eb241 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1732,7 +1732,7 @@ void removeLog( TLogData* self, Reference logData ) { } } -ACTOR Future pullAsyncData( TLogData* self, Reference logData, std::vector tags, Version beginVersion, Optional endVersion, bool poppedIsKnownCommitted, bool parallelGetMore ) { +ACTOR Future pullAsyncData( TLogData* self, Reference logData, std::vector tags, Version beginVersion, Optional endVersion, bool poppedIsKnownCommitted ) { state Future dbInfoChange = Void(); state Reference r; state Version tagAt = beginVersion; @@ -1746,7 +1746,7 @@ ACTOR Future pullAsyncData( TLogData* self, Reference logData, st } when( wait( dbInfoChange ) ) { if( logData->logSystem->get() ) { - r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, parallelGetMore ); + r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, true ); } else { r = Reference(); } @@ -1883,7 +1883,7 @@ ACTOR Future tLogCore( TLogData* self, Reference logData, TLogInt if(!logData->isPrimary) { std::vector tags; tags.push_back(logData->remoteTag); - logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional(), true, true) ); + logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional(), true) ); } try { @@ -2247,10 +2247,10 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit logData->logRouterPopToVersion = req.recoverAt; std::vector tags; tags.push_back(logData->remoteTag); - wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true, false) || logData->removed); + wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) || logData->removed); } else if(!req.recoverTags.empty()) { ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion); - wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false, true) || logData->removed); + wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed); } pulledRecoveryVersions = true; logData->knownCommittedVersion = req.recoverAt; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index b5578bedd7..a4c85f6ead 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -2130,7 +2130,7 @@ void removeLog( TLogData* self, Reference logData ) { } } -ACTOR Future pullAsyncData( TLogData* self, Reference logData, std::vector tags, Version beginVersion, Optional endVersion, bool poppedIsKnownCommitted, bool parallelGetMore ) { +ACTOR Future pullAsyncData( TLogData* self, Reference logData, std::vector tags, Version beginVersion, Optional endVersion, bool poppedIsKnownCommitted ) { state Future dbInfoChange = Void(); state Reference r; state Version tagAt = beginVersion; @@ -2148,7 +2148,7 @@ ACTOR Future pullAsyncData( TLogData* self, Reference logData, st } when( wait( dbInfoChange ) ) { if( logData->logSystem->get() ) { - r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, parallelGetMore ); + r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, true ); } else { r = Reference(); } @@ -2285,7 +2285,7 @@ ACTOR Future tLogCore( TLogData* self, Reference logData, TLogInt if(!logData->isPrimary) { std::vector tags; tags.push_back(logData->remoteTag); - logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional(), true, true) ); + logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional(), true) ); } try { @@ -2678,10 +2678,10 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit logData->logRouterPopToVersion = req.recoverAt; std::vector tags; tags.push_back(logData->remoteTag); - wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true, false) || logData->removed); + wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) || logData->removed); } else if(!req.recoverTags.empty()) { ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion); - wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false, true) || logData->removed); + wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed); } pulledRecoveryVersions = true; logData->knownCommittedVersion = req.recoverAt; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 9aa91105e8..35616454d8 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -553,21 +553,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, false ) ); + return Reference( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) ); } if(begin >= lastBegin) { TraceEvent("TLogPeekRemoteBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString()); - return Reference( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), false, std::vector(), Reference(), 0 ) ); + return Reference( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), parallelGetMore, std::vector(), Reference(), 0 ) ); } else { std::vector< Reference > cursors; std::vector< LogMessageVersion > epochEnds; TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString()); - cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector(), Reference(), 0 ) ); + cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), parallelGetMore, std::vector(), Reference(), 0 ) ); int i = 0; while(begin < lastBegin) { if(i == oldLogData.size()) { TraceEvent("TLogPeekRemoteDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size()); - return Reference( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, false ) ); + return Reference( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) ); } int bestOldSet = -1; @@ -584,14 +584,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, false ) ); + return Reference( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) ); } if(thisBegin < lastBegin) { TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString()) .detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin).detail("BestStartVer", oldLogData[i].tLogs[bestOldSet]->startVersion); cursors.emplace_back(new ILogSystem::MergedPeekCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag, - thisBegin, lastBegin, false, std::vector(), Reference(), 0)); + thisBegin, lastBegin, parallelGetMore, std::vector(), Reference(), 0)); epochEnds.emplace_back(lastBegin); lastBegin = thisBegin; } From f4143c4f50efde7ddb49c50196872dea00740187 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Fri, 1 Nov 2019 14:07:01 -0700 Subject: [PATCH 2/2] updated release notes --- documentation/sphinx/source/release-notes.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 98e18f76fc..a761cd2389 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -8,6 +8,7 @@ Release Notes Fixes ----- +* Significantly improved the rate at which the transaction logs in a remote region can pull data from the primary region. `(PR #2307) `_. * The ``system_kv_size_bytes`` status field could report a size much larger than the actual size of the system keyspace. `(PR #2305) `_. 6.2.7