fix: storage servers did not properly pull data when configuring from a fearless setup to a non-fearless setup
This commit is contained in:
parent
fa9089c2e8
commit
7e434348ce
|
@ -338,6 +338,7 @@ struct ILogSystem {
|
|||
UID randomID;
|
||||
|
||||
SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore );
|
||||
SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, std::vector< std::vector< Reference<IPeekCursor> > > const& serverCursors, LogMessageVersion const& messageVersion, int bestSet, int bestServer, Optional<LogMessageVersion> nextVersion );
|
||||
|
||||
virtual Reference<IPeekCursor> cloneNoMore();
|
||||
virtual void setProtocolVersion( uint64_t version );
|
||||
|
|
|
@ -506,9 +506,26 @@ ILogSystem::SetPeekCursor::SetPeekCursor( std::vector<Reference<LogSet>> const&
|
|||
sortedVersions.resize(maxServers);
|
||||
}
|
||||
|
||||
ILogSystem::SetPeekCursor::SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, std::vector< std::vector< Reference<IPeekCursor> > > const& serverCursors, LogMessageVersion const& messageVersion, int bestSet, int bestServer,
|
||||
Optional<LogMessageVersion> nextVersion ) : logSets(logSets), serverCursors(serverCursors), messageVersion(messageVersion), bestSet(bestSet), bestServer(bestServer), nextVersion(nextVersion), currentSet(bestSet), currentCursor(0),
|
||||
hasNextMessage(false), useBestSet(true), randomID(g_random->randomUniqueID()) {
|
||||
int maxServers = 0;
|
||||
for( int i = 0; i < logSets.size(); i++ ) {
|
||||
maxServers = std::max<int>(maxServers, serverCursors[i].size());
|
||||
}
|
||||
sortedVersions.resize(maxServers);
|
||||
calcHasMessage();
|
||||
}
|
||||
|
||||
Reference<ILogSystem::IPeekCursor> ILogSystem::SetPeekCursor::cloneNoMore() {
|
||||
ASSERT(false); //not implemented
|
||||
throw internal_error();
|
||||
vector< vector< Reference<ILogSystem::IPeekCursor> > > cursors;
|
||||
cursors.resize(logSets.size());
|
||||
for( int i = 0; i < logSets.size(); i++ ) {
|
||||
for( int j = 0; j < logSets[i]->logServers.size(); j++) {
|
||||
cursors[i].push_back( serverCursors[i][j]->cloneNoMore() );
|
||||
}
|
||||
}
|
||||
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( logSets, cursors, messageVersion, bestSet, bestServer, nextVersion ) );
|
||||
}
|
||||
|
||||
void ILogSystem::SetPeekCursor::setProtocolVersion( uint64_t version ) {
|
||||
|
|
|
@ -404,131 +404,143 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return waitForAll(quorumResults);
|
||||
}
|
||||
|
||||
Reference<IPeekCursor> peekAll( Version begin, Version end, Tag tag, bool parallelGetMore ) {
|
||||
int bestSet = -1;
|
||||
int nextBestSet = -1;
|
||||
std::vector<Reference<LogSet>> localSets;
|
||||
Version lastBegin = 0;
|
||||
for(auto& log : tLogs) {
|
||||
if(log->isLocal && log->logServers.size()) {
|
||||
lastBegin = std::max(lastBegin, log->startVersion);
|
||||
localSets.push_back(log);
|
||||
if(log->hasBestPolicy && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded)) {
|
||||
bestSet = localSets.size()-1;
|
||||
nextBestSet = bestSet;
|
||||
}
|
||||
if(log->hasBestPolicy && bestSet == -1) {
|
||||
nextBestSet = localSets.size()-1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(begin >= lastBegin) {
|
||||
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
|
||||
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, end, parallelGetMore ) );
|
||||
} else {
|
||||
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
|
||||
std::vector< LogMessageVersion > epochEnds;
|
||||
|
||||
if(lastBegin < end) {
|
||||
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
|
||||
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, lastBegin, end, parallelGetMore)) );
|
||||
}
|
||||
int i = 0;
|
||||
while(begin < lastBegin) {
|
||||
if(i == oldLogData.size()) {
|
||||
if(tag == txsTag) {
|
||||
break;
|
||||
}
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
int bestOldSet = -1;
|
||||
int nextBestOldSet = -1;
|
||||
std::vector<Reference<LogSet>> localOldSets;
|
||||
Version thisBegin = begin;
|
||||
for(auto& log : oldLogData[i].tLogs) {
|
||||
if(log->isLocal && log->logServers.size()) {
|
||||
thisBegin = std::max(thisBegin, log->startVersion);
|
||||
localOldSets.push_back(log);
|
||||
if(log->hasBestPolicy && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded)) {
|
||||
bestOldSet = localOldSets.size()-1;
|
||||
nextBestOldSet = bestOldSet;
|
||||
}
|
||||
if(log->hasBestPolicy && bestOldSet == -1) {
|
||||
nextBestOldSet = localOldSets.size()-1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(thisBegin < lastBegin) {
|
||||
if(thisBegin < end) {
|
||||
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localOldSets, bestOldSet == -1 ? nextBestOldSet : bestOldSet,
|
||||
bestOldSet >= 0 ? localOldSets[bestOldSet]->bestLocationFor( tag ) : -1, tag, thisBegin, std::min(lastBegin, end), parallelGetMore)) );
|
||||
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
|
||||
}
|
||||
lastBegin = thisBegin;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
|
||||
}
|
||||
}
|
||||
|
||||
Reference<IPeekCursor> peekRemote( Version begin, Tag tag, bool parallelGetMore ) {
|
||||
int bestSet = -1;
|
||||
Version lastBegin = 0;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->isLocal) {
|
||||
lastBegin = std::max(lastBegin, tLogs[t]->startVersion);
|
||||
}
|
||||
|
||||
if(tLogs[t]->logRouters.size()) {
|
||||
ASSERT(bestSet == -1);
|
||||
bestSet = t;
|
||||
}
|
||||
}
|
||||
if(bestSet == -1) {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
if(begin >= lastBegin) {
|
||||
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) );
|
||||
} else {
|
||||
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
|
||||
std::vector< LogMessageVersion > epochEnds;
|
||||
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
|
||||
int i = 0;
|
||||
while(begin < lastBegin) {
|
||||
if(i == oldLogData.size()) {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
int bestOldSet = -1;
|
||||
Version thisBegin = begin;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->isLocal) {
|
||||
thisBegin = std::max(thisBegin, oldLogData[i].tLogs[t]->startVersion);
|
||||
}
|
||||
|
||||
if(oldLogData[i].tLogs[t]->logRouters.size()) {
|
||||
ASSERT(bestOldSet == -1);
|
||||
bestOldSet = t;
|
||||
}
|
||||
}
|
||||
if(bestOldSet == -1) {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
if(thisBegin < lastBegin) {
|
||||
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
|
||||
thisBegin, lastBegin, false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
|
||||
epochEnds.push_back(LogMessageVersion(lastBegin));
|
||||
lastBegin = thisBegin;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
|
||||
}
|
||||
}
|
||||
|
||||
virtual Reference<IPeekCursor> peek( Version begin, Tag tag, bool parallelGetMore ) {
|
||||
if(!tLogs.size()) {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
if(tag.locality == tagLocalityRemoteLog) {
|
||||
int bestSet = -1;
|
||||
Version lastBegin = 0;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->isLocal) {
|
||||
lastBegin = std::max(lastBegin, tLogs[t]->startVersion);
|
||||
}
|
||||
|
||||
if(tLogs[t]->logRouters.size()) {
|
||||
ASSERT(bestSet == -1);
|
||||
bestSet = t;
|
||||
}
|
||||
}
|
||||
if(bestSet == -1) {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
if(begin >= lastBegin) {
|
||||
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) );
|
||||
} else {
|
||||
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
|
||||
std::vector< LogMessageVersion > epochEnds;
|
||||
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
|
||||
int i = 0;
|
||||
while(begin < lastBegin) {
|
||||
if(i == oldLogData.size()) {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
int bestOldSet = -1;
|
||||
Version thisBegin = begin;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->isLocal) {
|
||||
thisBegin = std::max(thisBegin, oldLogData[i].tLogs[t]->startVersion);
|
||||
}
|
||||
|
||||
if(oldLogData[i].tLogs[t]->logRouters.size()) {
|
||||
ASSERT(bestOldSet == -1);
|
||||
bestOldSet = t;
|
||||
}
|
||||
}
|
||||
if(bestOldSet == -1) {
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
if(thisBegin < lastBegin) {
|
||||
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
|
||||
thisBegin, lastBegin, false, std::vector<LocalityData>(), IRepPolicyRef(), 0 ) ) );
|
||||
epochEnds.push_back(LogMessageVersion(lastBegin));
|
||||
lastBegin = thisBegin;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
|
||||
}
|
||||
|
||||
return peekRemote(begin, tag, parallelGetMore);
|
||||
} else {
|
||||
int bestSet = -1;
|
||||
int nextBestSet = -1;
|
||||
std::vector<Reference<LogSet>> localSets;
|
||||
Version lastBegin = 0;
|
||||
for(auto& log : tLogs) {
|
||||
if(log->isLocal && log->logServers.size()) {
|
||||
lastBegin = std::max(lastBegin, log->startVersion);
|
||||
localSets.push_back(log);
|
||||
if(log->hasBestPolicy && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded)) {
|
||||
bestSet = localSets.size()-1;
|
||||
nextBestSet = bestSet;
|
||||
}
|
||||
if(log->hasBestPolicy && bestSet == -1) {
|
||||
nextBestSet = localSets.size()-1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(begin >= lastBegin) {
|
||||
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
|
||||
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) );
|
||||
} else {
|
||||
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
|
||||
std::vector< LogMessageVersion > epochEnds;
|
||||
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet == -1 ? nextBestSet : bestSet,
|
||||
bestSet >= 0 ? localSets[bestSet]->bestLocationFor( tag ) : -1, tag, lastBegin, getPeekEnd(), parallelGetMore)) );
|
||||
int i = 0;
|
||||
while(begin < lastBegin) {
|
||||
if(i == oldLogData.size()) {
|
||||
if(tag == txsTag) {
|
||||
break;
|
||||
}
|
||||
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
|
||||
}
|
||||
|
||||
int bestOldSet = -1;
|
||||
int nextBestOldSet = -1;
|
||||
std::vector<Reference<LogSet>> localOldSets;
|
||||
Version thisBegin = begin;
|
||||
for(auto& log : oldLogData[i].tLogs) {
|
||||
if(log->isLocal && log->logServers.size()) {
|
||||
thisBegin = std::max(thisBegin, log->startVersion);
|
||||
localOldSets.push_back(log);
|
||||
if(log->hasBestPolicy && (log->locality == tag.locality || tag.locality == tagLocalitySpecial || log->locality == tagLocalitySpecial || log->locality == tagLocalityUpgraded)) {
|
||||
bestOldSet = localOldSets.size()-1;
|
||||
nextBestOldSet = bestOldSet;
|
||||
}
|
||||
if(log->hasBestPolicy && bestOldSet == -1) {
|
||||
nextBestOldSet = localOldSets.size()-1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(thisBegin < lastBegin) {
|
||||
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localOldSets, bestOldSet == -1 ? nextBestOldSet : bestOldSet,
|
||||
bestOldSet >= 0 ? localOldSets[bestOldSet]->bestLocationFor( tag ) : -1, tag, thisBegin, lastBegin, parallelGetMore)) );
|
||||
epochEnds.push_back(LogMessageVersion(lastBegin));
|
||||
lastBegin = thisBegin;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
||||
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
|
||||
}
|
||||
return peekAll(begin, std::numeric_limits<Version>::max(), tag, parallelGetMore);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -550,8 +562,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
Reference<IPeekCursor> peekLocal( Tag tag, Version begin, Version end ) {
|
||||
int bestSet = -1;
|
||||
bool foundSpecial = false;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->logServers.size() && tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || (tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
if( tLogs[t]->locality == tagLocalitySpecial ) {
|
||||
foundSpecial = true;
|
||||
}
|
||||
bestSet = t;
|
||||
break;
|
||||
}
|
||||
|
@ -579,13 +595,22 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
int bestOldSet = -1;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->logServers.size() && oldLogData[i].tLogs[t]->hasBestPolicy && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || (oldLogData[i].tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
if( oldLogData[i].tLogs[t]->locality == tagLocalitySpecial ) {
|
||||
foundSpecial = true;
|
||||
}
|
||||
bestOldSet = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(bestOldSet == -1) {
|
||||
i++;
|
||||
continue;
|
||||
if(!foundSpecial) {
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
cursors.push_back(peekAll(begin, std::min(lastBegin, end), tag, false));
|
||||
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Version thisBegin = std::max(oldLogData[i].tLogs[bestOldSet]->startVersion, begin);
|
||||
|
|
Loading…
Reference in New Issue