Merge pull request #2307 from etschannen/feature-enable-parallel-peek
Parallel peeking was not enabled when fetching from log routers
This commit is contained in:
commit
c805ed11e8
|
@ -8,6 +8,7 @@ Release Notes
|
|||
Fixes
|
||||
-----
|
||||
|
||||
* Significantly improved the rate at which the transaction logs in a remote region can pull data from the primary region. `(PR #2307) <https://github.com/apple/foundationdb/pull/2307>`_.
|
||||
* The ``system_kv_size_bytes`` status field could report a size much larger than the actual size of the system keyspace. `(PR #2305) <https://github.com/apple/foundationdb/pull/2305>`_.
|
||||
|
||||
6.2.7
|
||||
|
|
|
@ -339,6 +339,9 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
|
|||
try {
|
||||
peekId = req.sequence.get().first;
|
||||
sequence = req.sequence.get().second;
|
||||
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) {
|
||||
throw timed_out();
|
||||
}
|
||||
auto& trackerData = self->peekTracker[peekId];
|
||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
|
||||
|
|
|
@ -1732,7 +1732,7 @@ void removeLog( TLogData* self, Reference<LogData> logData ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted, bool parallelGetMore ) {
|
||||
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted ) {
|
||||
state Future<Void> dbInfoChange = Void();
|
||||
state Reference<ILogSystem::IPeekCursor> r;
|
||||
state Version tagAt = beginVersion;
|
||||
|
@ -1746,7 +1746,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
|
|||
}
|
||||
when( wait( dbInfoChange ) ) {
|
||||
if( logData->logSystem->get() ) {
|
||||
r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, parallelGetMore );
|
||||
r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, true );
|
||||
} else {
|
||||
r = Reference<ILogSystem::IPeekCursor>();
|
||||
}
|
||||
|
@ -1883,7 +1883,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
|
|||
if(!logData->isPrimary) {
|
||||
std::vector<Tag> tags;
|
||||
tags.push_back(logData->remoteTag);
|
||||
logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional<Version>(), true, true) );
|
||||
logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional<Version>(), true) );
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -2247,10 +2247,10 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
logData->logRouterPopToVersion = req.recoverAt;
|
||||
std::vector<Tag> tags;
|
||||
tags.push_back(logData->remoteTag);
|
||||
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true, false) || logData->removed);
|
||||
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) || logData->removed);
|
||||
} else if(!req.recoverTags.empty()) {
|
||||
ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion);
|
||||
wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false, true) || logData->removed);
|
||||
wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed);
|
||||
}
|
||||
pulledRecoveryVersions = true;
|
||||
logData->knownCommittedVersion = req.recoverAt;
|
||||
|
|
|
@ -2130,7 +2130,7 @@ void removeLog( TLogData* self, Reference<LogData> logData ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted, bool parallelGetMore ) {
|
||||
ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, std::vector<Tag> tags, Version beginVersion, Optional<Version> endVersion, bool poppedIsKnownCommitted ) {
|
||||
state Future<Void> dbInfoChange = Void();
|
||||
state Reference<ILogSystem::IPeekCursor> r;
|
||||
state Version tagAt = beginVersion;
|
||||
|
@ -2148,7 +2148,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
|
|||
}
|
||||
when( wait( dbInfoChange ) ) {
|
||||
if( logData->logSystem->get() ) {
|
||||
r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, parallelGetMore );
|
||||
r = logData->logSystem->get()->peek( logData->logId, tagAt, endVersion, tags, true );
|
||||
} else {
|
||||
r = Reference<ILogSystem::IPeekCursor>();
|
||||
}
|
||||
|
@ -2285,7 +2285,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
|
|||
if(!logData->isPrimary) {
|
||||
std::vector<Tag> tags;
|
||||
tags.push_back(logData->remoteTag);
|
||||
logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional<Version>(), true, true) );
|
||||
logData->addActor.send( pullAsyncData(self, logData, tags, pulledRecoveryVersions ? logData->recoveredAt + 1 : logData->unrecoveredBefore, Optional<Version>(), true) );
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -2678,10 +2678,10 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
|||
logData->logRouterPopToVersion = req.recoverAt;
|
||||
std::vector<Tag> tags;
|
||||
tags.push_back(logData->remoteTag);
|
||||
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true, false) || logData->removed);
|
||||
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) || logData->removed);
|
||||
} else if(!req.recoverTags.empty()) {
|
||||
ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion);
|
||||
wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false, true) || logData->removed);
|
||||
wait(pullAsyncData(self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) || logData->removed);
|
||||
}
|
||||
pulledRecoveryVersions = true;
|
||||
logData->knownCommittedVersion = req.recoverAt;
|
||||
|
|
|
@ -553,21 +553,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
if(bestSet == -1) {
|
||||
TraceEvent("TLogPeekRemoteNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
|
||||
}
|
||||
if(begin >= lastBegin) {
|
||||
TraceEvent("TLogPeekRemoteBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
|
||||
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
|
||||
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
|
||||
} else {
|
||||
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
|
||||
std::vector< LogMessageVersion > epochEnds;
|
||||
TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
|
||||
cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
|
||||
cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
|
||||
int i = 0;
|
||||
while(begin < lastBegin) {
|
||||
if(i == oldLogData.size()) {
|
||||
TraceEvent("TLogPeekRemoteDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
|
||||
}
|
||||
|
||||
int bestOldSet = -1;
|
||||
|
@ -584,14 +584,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
if(bestOldSet == -1) {
|
||||
TraceEvent("TLogPeekRemoteNoOldBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
|
||||
}
|
||||
|
||||
if(thisBegin < lastBegin) {
|
||||
TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString())
|
||||
.detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin).detail("BestStartVer", oldLogData[i].tLogs[bestOldSet]->startVersion);
|
||||
cursors.emplace_back(new ILogSystem::MergedPeekCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
|
||||
thisBegin, lastBegin, false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0));
|
||||
thisBegin, lastBegin, parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0));
|
||||
epochEnds.emplace_back(lastBegin);
|
||||
lastBegin = thisBegin;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue