diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index ee8722a462..da1f1c3af8 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -38,7 +38,7 @@ void uniquify( Collection& c ) { c.resize( std::unique(c.begin(), c.end()) - c.begin() ); } -class LogSet { +class LogSet : NonCopyable, public ReferenceCounted { public: std::vector>>> logServers; std::vector>>> logRouters; @@ -327,7 +327,7 @@ struct ILogSystem { }; struct SetPeekCursor : IPeekCursor, ReferenceCounted { - std::vector logSets; + std::vector> logSets; std::vector< std::vector< Reference > > serverCursors; Tag tag; int bestSet, bestServer, currentSet, currentCursor; @@ -339,7 +339,7 @@ struct ILogSystem { bool useBestSet; UID randomID; - SetPeekCursor( std::vector const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore ); + SetPeekCursor( std::vector> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore ); virtual Reference cloneNoMore(); @@ -532,7 +532,13 @@ struct LogPushData : NonCopyable { // Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version explicit LogPushData(Reference logSystem) : logSystem(logSystem), subsequence(1) { - tags.resize( logSystem->getLogSystemConfig().tLogs.size() ); + int totalSize = 0; + for(auto& log : logSystem->getLogSystemConfig().tLogs) { + if(log.isLocal) { + totalSize += log.tLogs.size(); + } + } + tags.resize( totalSize ); for(int i = 0; i < tags.size(); i++) { messagesWriter.push_back( BinaryWriter( AssumeVersion(currentProtocolVersion) ) ); } diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index d97143e378..180d4889e1 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -411,13 +411,13 @@ Version ILogSystem::MergedPeekCursor::popped() { return poppedVersion; } -ILogSystem::SetPeekCursor::SetPeekCursor( std::vector const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore ) +ILogSystem::SetPeekCursor::SetPeekCursor( std::vector> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore ) : logSets(logSets), bestSet(bestSet), bestServer(bestServer), tag(tag), currentCursor(0), currentSet(bestSet), hasNextMessage(false), messageVersion(begin), useBestSet(true), randomID(g_random->randomUniqueID()) { serverCursors.resize(logSets.size()); int maxServers = 0; for( int i = 0; i < logSets.size(); i++ ) { - for( int j = 0; j < logSets[i].logServers.size(); j++) { - Reference cursor( new ILogSystem::ServerPeekCursor( logSets[i].logServers[j], tag, begin, end, true, parallelGetMore ) ); + for( int j = 0; j < logSets[i]->logServers.size(); j++) { + Reference cursor( new ILogSystem::ServerPeekCursor( logSets[i]->logServers[j], tag, begin, end, true, parallelGetMore ) ); serverCursors[i].push_back( cursor ); } maxServers = std::max(maxServers, serverCursors[i].size()); @@ -506,18 +506,18 @@ void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) { std::sort(sortedVersions.begin(), sortedVersions.end()); for(auto sortedVersion : sortedVersions) { - auto& locality = logSets[logIdx].tLogLocalities[sortedVersion.second]; + auto& locality = logSets[logIdx]->tLogLocalities[sortedVersion.second]; localityGroup.add(locality); - if( localityGroup.size() >= logSets[logIdx].tLogReplicationFactor && localityGroup.validate(logSets[logIdx].tLogPolicy) ) { + if( localityGroup.size() >= logSets[logIdx]->tLogReplicationFactor && localityGroup.validate(logSets[logIdx]->tLogPolicy) ) { messageVersion = sortedVersion.first; break; } } } else { //(int)oldLogData[i].logServers.size() + 1 - oldLogData[i].tLogReplicationFactor - std::nth_element(sortedVersions.begin(), sortedVersions.end()-(logSets[logIdx].logServers.size()+1-logSets[logIdx].tLogReplicationFactor), sortedVersions.end()); - messageVersion = sortedVersions[sortedVersions.size()-(logSets[logIdx].logServers.size()+1-logSets[logIdx].tLogReplicationFactor)].first; + std::nth_element(sortedVersions.begin(), sortedVersions.end()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor), sortedVersions.end()); + messageVersion = sortedVersions[sortedVersions.size()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor)].first; } for(int i = 0; i < serverCursors[logIdx].size(); i++) { @@ -586,10 +586,10 @@ ACTOR Future setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer self->localityGroup.clear(); for( int i = 0; i < self->serverCursors[self->bestSet].size(); i++) { if(!self->serverCursors[self->bestSet][i]->isActive()) { - self->localityGroup.add(self->logSets[self->bestSet].tLogLocalities[i]); + self->localityGroup.add(self->logSets[self->bestSet]->tLogLocalities[i]); } } - bestSetValid = self->localityGroup.size() < self->logSets[self->bestSet].tLogReplicationFactor || !self->localityGroup.validate(self->logSets[self->bestSet].tLogPolicy); + bestSetValid = self->localityGroup.size() < self->logSets[self->bestSet]->tLogReplicationFactor || !self->localityGroup.validate(self->logSets[self->bestSet]->tLogPolicy); } if(bestSetValid) { vector> q; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 4b8d3df68c..64e2112545 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -43,7 +43,7 @@ ACTOR static Future reportTLogCommitErrors( Future commitReply, UID } struct OldLogData { - std::vector tLogs; + std::vector> tLogs; Version epochEnd; OldLogData() : epochEnd(0) {} @@ -52,7 +52,7 @@ struct OldLogData { struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted { UID dbgid; int logSystemType; - std::vector tLogs; + std::vector> tLogs; int minRouters; // new members @@ -88,8 +88,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedget().id().toString() + ((j == tLogs[i].logServers.size() - 1) ? " " : ", "); + for( int j = 0; j < tLogs[i]->logServers.size(); j++) { + result = result + tLogs[i]->logServers[j]->get().id().toString() + ((j == tLogs[i]->logServers.size() - 1) ? " " : ", "); } } return result; @@ -108,43 +108,48 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSystem( new TagPartitionedLogSystem(dbgid, locality) ); - for( auto& it : lsConf.tLogs ) { - LogSet logSet; - for( auto& log : it.tLogs) { - logSet.logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); + logSystem->tLogs.resize(lsConf.tLogs.size()); + for( int i = 0; i < lsConf.tLogs.size(); i++ ) { + Reference logSet = Reference( new LogSet() ); + logSystem->tLogs[i] = logSet; + TLogSet const& tLogSet = lsConf.tLogs[i]; + for( auto& log : tLogSet.tLogs) { + logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); } - for( auto& log : it.logRouters) { - logSet.logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); + for( auto& log : tLogSet.logRouters) { + logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); } - logSet.tLogWriteAntiQuorum = it.tLogWriteAntiQuorum; - logSet.tLogReplicationFactor = it.tLogReplicationFactor; - logSet.tLogPolicy = it.tLogPolicy; - logSet.tLogLocalities = it.tLogLocalities; - logSet.isLocal = it.isLocal; - logSet.hasBest = it.hasBest; - logSet.updateLocalitySet(); - logSystem->tLogs.push_back(logSet); - if(logSet.logRouters.size() > 0) logSystem->minRouters = std::min(logSystem->minRouters, logSet.logRouters.size()); + logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; + logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; + logSet->tLogPolicy = tLogSet.tLogPolicy; + logSet->tLogLocalities = tLogSet.tLogLocalities; + logSet->isLocal = tLogSet.isLocal; + logSet->hasBest = tLogSet.hasBest; + logSet->updateLocalitySet(); + + if(logSet->logRouters.size() > 0) logSystem->minRouters = std::min(logSystem->minRouters, logSet->logRouters.size()); } logSystem->oldLogData.resize(lsConf.oldTLogs.size()); for( int i = 0; i < lsConf.oldTLogs.size(); i++ ) { - for( auto& it : lsConf.oldTLogs[i].tLogs ) { - LogSet logSet; - for( auto & log : it.tLogs) { - logSet.logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); + logSystem->oldLogData[i].tLogs.resize(lsConf.oldTLogs[i].tLogs.size()); + for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) { + Reference logSet = Reference( new LogSet() ); + logSystem->oldLogData[i].tLogs[j] = logSet; + TLogSet const& tLogData = lsConf.oldTLogs[i].tLogs[j]; + for( auto & log : tLogData.tLogs) { + logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); } - for( auto & log : it.logRouters) { - logSet.logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); + for( auto & log : tLogData.logRouters) { + logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); } - logSet.tLogWriteAntiQuorum = it.tLogWriteAntiQuorum; - logSet.tLogReplicationFactor = it.tLogReplicationFactor; - logSet.tLogPolicy = it.tLogPolicy; - logSet.tLogLocalities = it.tLogLocalities; - logSet.isLocal = it.isLocal; - logSet.hasBest = it.hasBest; + logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum; + logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor; + logSet->tLogPolicy = tLogData.tLogPolicy; + logSet->tLogLocalities = tLogData.tLogLocalities; + logSet->isLocal = tLogData.isLocal; + logSet->hasBest = tLogData.hasBest; //logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs - logSystem->oldLogData[i].tLogs.push_back(logSet); } logSystem->oldLogData[i].epochEnd = lsConf.oldTLogs[i].epochEnd; } @@ -159,44 +164,48 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSystem( new TagPartitionedLogSystem(dbgid, locality) ); if(lsConf.oldTLogs.size()) { - for( auto& it : lsConf.oldTLogs[0].tLogs ) { - LogSet logSet; - for( auto & log : it.tLogs) { - logSet.logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); + logSystem->tLogs.resize( lsConf.oldTLogs[0].tLogs.size()); + for( int i = 0; i < lsConf.oldTLogs[0].tLogs.size(); i++ ) { + Reference logSet = Reference( new LogSet() ); + logSystem->tLogs[i] = logSet; + TLogSet const& tLogSet = lsConf.oldTLogs[0].tLogs[i]; + for( auto & log : tLogSet.tLogs) { + logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); } - for( auto & log : it.logRouters) { - logSet.logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); + for( auto & log : tLogSet.logRouters) { + logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); } - logSet.tLogWriteAntiQuorum = it.tLogWriteAntiQuorum; - logSet.tLogReplicationFactor = it.tLogReplicationFactor; - logSet.tLogPolicy = it.tLogPolicy; - logSet.tLogLocalities = it.tLogLocalities; - logSet.isLocal = it.isLocal; - logSet.hasBest = it.hasBest; - //logSet.updateLocalitySet(); we do not update the locality set, since we never push to old logs - logSystem->tLogs.push_back(logSet); - if(logSet.logRouters.size() > 0) logSystem->minRouters = std::min(logSystem->minRouters, logSet.logRouters.size()); + logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; + logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; + logSet->tLogPolicy = tLogSet.tLogPolicy; + logSet->tLogLocalities = tLogSet.tLogLocalities; + logSet->isLocal = tLogSet.isLocal; + logSet->hasBest = tLogSet.hasBest; + //logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs + if(logSet->logRouters.size() > 0) logSystem->minRouters = std::min(logSystem->minRouters, logSet->logRouters.size()); } //logSystem->epochEnd = lsConf.oldTLogs[0].epochEnd; logSystem->oldLogData.resize(lsConf.oldTLogs.size()-1); for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) { - for( auto& it : lsConf.oldTLogs[i].tLogs ) { - LogSet logSet; - for( auto & log : it.tLogs) { - logSet.logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); + logSystem->oldLogData[i-1].tLogs.resize(lsConf.oldTLogs[i].tLogs.size()); + for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) { + Reference logSet = Reference( new LogSet() ); + logSystem->oldLogData[i-1].tLogs[j] = logSet; + TLogSet const& tLogSet = lsConf.oldTLogs[i].tLogs[j]; + for( auto & log : tLogSet.tLogs) { + logSet->logServers.push_back( Reference>>( new AsyncVar>( log ) ) ); } - for( auto & log : it.logRouters) { - logSet.logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); + for( auto & log : tLogSet.logRouters) { + logSet->logRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); } - logSet.tLogWriteAntiQuorum = it.tLogWriteAntiQuorum; - logSet.tLogReplicationFactor = it.tLogReplicationFactor; - logSet.tLogPolicy = it.tLogPolicy; - logSet.tLogLocalities = it.tLogLocalities; - logSet.isLocal = it.isLocal; - logSet.hasBest = it.hasBest; - //logSet.updateLocalitySet(); we do not update the locality set, since we never push to old logs - logSystem->oldLogData[i-1].tLogs.push_back(logSet); + logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; + logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; + logSet->tLogPolicy = tLogSet.tLogPolicy; + logSet->tLogLocalities = tLogSet.tLogLocalities; + logSet->isLocal = tLogSet.isLocal; + logSet->hasBest = tLogSet.hasBest; + //logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs } logSystem->oldLogData[i-1].epochEnd = lsConf.oldTLogs[i].epochEnd; } @@ -213,15 +222,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers) { coreSet.tLogs.push_back(log->get().id()); coreSet.tLogLocalities.push_back(log->get().interf().locality); } - coreSet.tLogWriteAntiQuorum = t.tLogWriteAntiQuorum; - coreSet.tLogReplicationFactor = t.tLogReplicationFactor; - coreSet.tLogPolicy = t.tLogPolicy; - coreSet.isLocal = t.isLocal; - coreSet.hasBest = t.hasBest; + coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum; + coreSet.tLogReplicationFactor = t->tLogReplicationFactor; + coreSet.tLogPolicy = t->tLogPolicy; + coreSet.isLocal = t->isLocal; + coreSet.hasBest = t->hasBest; newState.tLogs.push_back(coreSet); } @@ -231,15 +240,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers) { coreSet.tLogs.push_back(log->get().id()); } - coreSet.tLogLocalities = t.tLogLocalities; - coreSet.tLogWriteAntiQuorum = t.tLogWriteAntiQuorum; - coreSet.tLogReplicationFactor = t.tLogReplicationFactor; - coreSet.tLogPolicy = t.tLogPolicy; - coreSet.isLocal = t.isLocal; - coreSet.hasBest = t.hasBest; + coreSet.tLogLocalities = t->tLogLocalities; + coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum; + coreSet.tLogReplicationFactor = t->tLogReplicationFactor; + coreSet.tLogPolicy = t->tLogPolicy; + coreSet.isLocal = t->isLocal; + coreSet.hasBest = t->hasBest; newState.oldTLogData[i].tLogs.push_back(coreSet); } newState.oldTLogData[i].epochEnd = oldLogData[i].epochEnd; @@ -267,12 +276,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> failed; for( auto& it : tLogs ) { - for(auto &t : it.logServers) { + for(auto &t : it->logServers) { if( t->get().present() ) { failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) ); } } - for(auto &t : it.logRouters) { + for(auto &t : it->logRouters) { if( t->get().present() ) { failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) ); } @@ -287,19 +296,19 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> quorumResults; int location = 0; - for(auto it : tLogs) { - if(it.isLocal) { + for(auto& it : tLogs) { + if(it->isLocal) { vector> tLogCommitResults; - for(int loc=0; loc< it.logServers.size(); loc++) { + for(int loc=0; loc< it->logServers.size(); loc++) { Future commitMessage = reportTLogCommitErrors( - it.logServers[loc]->get().interf().commit.getReply( + it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( data.getArena(), prevVersion, version, knownCommittedVersion, data.getMessages(location), data.getTags(location), debugID ), TaskTLogCommitReply ), getDebugID()); actors.add(commitMessage); tLogCommitResults.push_back(commitMessage); location++; } - quorumResults.push_back( quorum( tLogCommitResults, tLogCommitResults.size() - it.tLogWriteAntiQuorum ) ); + quorumResults.push_back( quorum( tLogCommitResults, tLogCommitResults.size() - it->tLogWriteAntiQuorum ) ); } } @@ -307,18 +316,22 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted peek( Version begin, Tag tag, bool parallelGetMore ) { + if(tLogs.size() < 2) { + return Reference( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, false ) ); + } + if(tag >= SERVER_KNOBS->MAX_TAG) { //FIXME: non-static logRouters - return Reference( new ILogSystem::MergedPeekCursor( tLogs[1].logRouters, -1, (int)tLogs[1].logRouters.size(), tag, begin, getPeekEnd(), false ) ); + return Reference( new ILogSystem::MergedPeekCursor( tLogs[1]->logRouters, -1, (int)tLogs[1]->logRouters.size(), tag, begin, getPeekEnd(), false ) ); } else { if(oldLogData.size() == 0 || begin >= oldLogData[0].epochEnd) { - return Reference( new ILogSystem::SetPeekCursor( tLogs, 1, tLogs[1].logServers.size() ? tLogs[1].bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) ); + return Reference( new ILogSystem::SetPeekCursor( tLogs, 1, tLogs[1]->logServers.size() ? tLogs[1]->bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) ); } else { std::vector< Reference > cursors; std::vector< LogMessageVersion > epochEnds; - cursors.push_back( Reference( new ILogSystem::SetPeekCursor( tLogs, 1, tLogs[1].logServers.size() ? tLogs[1].bestLocationFor( tag ) : -1, tag, oldLogData[0].epochEnd, getPeekEnd(), parallelGetMore)) ); + cursors.push_back( Reference( new ILogSystem::SetPeekCursor( tLogs, 1, tLogs[1]->logServers.size() ? tLogs[1]->bestLocationFor( tag ) : -1, tag, oldLogData[0].epochEnd, getPeekEnd(), parallelGetMore)) ); for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) { - cursors.push_back( Reference( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1].logServers.size() ? oldLogData[i].tLogs[1].bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, parallelGetMore)) ); + cursors.push_back( Reference( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1]->logServers.size() ? oldLogData[i].tLogs[1]->bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, parallelGetMore)) ); epochEnds.push_back(LogMessageVersion(oldLogData[i].epochEnd)); } @@ -329,19 +342,23 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted peekSingle( Version begin, Tag tag ) { ASSERT(tag < SERVER_KNOBS->MAX_TAG); + if(tLogs.size() < 2) { + return Reference( new ILogSystem::ServerPeekCursor( Reference>>(), tag, begin, getPeekEnd(), false, false ) ); + } + if(oldLogData.size() == 0 || begin >= oldLogData[0].epochEnd) { - return Reference( new ILogSystem::ServerPeekCursor( tLogs[1].logServers.size() ? - tLogs[1].logServers[tLogs[1].bestLocationFor( tag )] : + return Reference( new ILogSystem::ServerPeekCursor( tLogs[1]->logServers.size() ? + tLogs[1]->logServers[tLogs[1]->bestLocationFor( tag )] : Reference>>(), tag, begin, getPeekEnd(), false, false ) ); } else { TEST(true); //peekSingle used during non-copying tlog recovery std::vector< Reference > cursors; std::vector< LogMessageVersion > epochEnds; - cursors.push_back( Reference( new ILogSystem::ServerPeekCursor( tLogs[1].logServers.size() ? - tLogs[1].logServers[tLogs[1].bestLocationFor( tag )] : + cursors.push_back( Reference( new ILogSystem::ServerPeekCursor( tLogs[1]->logServers.size() ? + tLogs[1]->logServers[tLogs[1]->bestLocationFor( tag )] : Reference>>(), tag, oldLogData[0].epochEnd, getPeekEnd(), false, false) ) ); for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) { - cursors.push_back( Reference( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1].logServers.size() ? oldLogData[i].tLogs[1].bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, false)) ); + cursors.push_back( Reference( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1]->logServers.size() ? oldLogData[i].tLogs[1]->bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, false)) ); epochEnds.push_back(LogMessageVersion(oldLogData[i].epochEnd)); } @@ -352,7 +369,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted>>>& popServers = tag >= SERVER_KNOBS->MAX_TAG ? t.logRouters : t.logServers; + std::vector>>>& popServers = tag >= SERVER_KNOBS->MAX_TAG ? t->logRouters : t->logServers; for(auto& log : popServers) { Version prev = outstandingPops[std::make_pair(log->get().id(),tag)]; if (prev < upTo) @@ -394,13 +411,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> quorumResults; for(auto& it : tLogs) { - if(it.isLocal) { + if(it->isLocal) { vector> alive; - for(auto& t : it.logServers) { + for(auto& t : it->logServers) { if( t->get().present() ) alive.push_back( brokenPromiseToNever( t->get().interf().confirmRunning.getReply(TLogConfirmRunningRequest(debugID), TaskTLogConfirmRunningReply ) ) ); else alive.push_back( Never() ); } - quorumResults.push_back( quorum( alive, alive.size() - it.tLogWriteAntiQuorum ) ); + quorumResults.push_back( quorum( alive, alive.size() - it->tLogWriteAntiQuorum ) ); } } @@ -417,48 +434,48 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSet = tLogs[i]; + log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum; + log.tLogReplicationFactor = logSet->tLogReplicationFactor; + log.tLogPolicy = logSet->tLogPolicy; + log.tLogLocalities = logSet->tLogLocalities; + log.isLocal = logSet->isLocal; + log.hasBest = logSet->hasBest; - for( int i = 0; i < t.logServers.size(); i++ ) { - log.tLogs.push_back(t.logServers[i]->get()); + for( int i = 0; i < logSet->logServers.size(); i++ ) { + log.tLogs.push_back(logSet->logServers[i]->get()); } - for( int i = 0; i < t.logRouters.size(); i++ ) { - log.logRouters.push_back(t.logRouters[i]->get()); + for( int i = 0; i < logSet->logRouters.size(); i++ ) { + log.logRouters.push_back(logSet->logRouters[i]->get()); } - - logSystemConfig.tLogs.push_back(log); } if(!recoveryCompleteWrittenToCoreState) { for( int i = 0; i < oldLogData.size(); i++ ) { logSystemConfig.oldTLogs.push_back(OldTLogConf()); - for( auto& t : oldLogData[i].tLogs ) { - TLogSet log; - log.tLogWriteAntiQuorum = t.tLogWriteAntiQuorum; - log.tLogReplicationFactor = t.tLogReplicationFactor; - log.tLogPolicy = t.tLogPolicy; - log.tLogLocalities = t.tLogLocalities; - log.isLocal = t.isLocal; - log.hasBest = t.hasBest; + logSystemConfig.oldTLogs[i].tLogs.resize(oldLogData[i].tLogs.size()); + for( int j = 0; j < oldLogData[i].tLogs.size(); j++ ) { + TLogSet& log = logSystemConfig.oldTLogs[i].tLogs[j]; + Reference logSet = oldLogData[i].tLogs[j]; + log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum; + log.tLogReplicationFactor = logSet->tLogReplicationFactor; + log.tLogPolicy = logSet->tLogPolicy; + log.tLogLocalities = logSet->tLogLocalities; + log.isLocal = logSet->isLocal; + log.hasBest = logSet->hasBest; - for( int i = 0; i < t.logServers.size(); i++ ) { - log.tLogs.push_back(t.logServers[i]->get()); + for( int i = 0; i < logSet->logServers.size(); i++ ) { + log.tLogs.push_back(logSet->logServers[i]->get()); } - for( int i = 0; i < t.logRouters.size(); i++ ) { - log.logRouters.push_back(t.logRouters[i]->get()); + for( int i = 0; i < logSet->logRouters.size(); i++ ) { + log.logRouters.push_back(logSet->logRouters[i]->get()); } - - logSystemConfig.oldTLogs[i].tLogs.push_back(log); } logSystemConfig.oldTLogs[i].epochEnd = oldLogData[i].epochEnd; } @@ -470,15 +487,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> logs; vector> oldLogs; for(auto& t : tLogs) { - for( int i = 0; i < t.logServers.size(); i++ ) { - logs.push_back(std::make_pair(t.logServers[i]->get().id(), t.logServers[i]->get().present() ? t.logServers[i]->get().interf().address() : NetworkAddress())); + for( int i = 0; i < t->logServers.size(); i++ ) { + logs.push_back(std::make_pair(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress())); } } if(!recoveryCompleteWrittenToCoreState) { for( int i = 0; i < oldLogData.size(); i++ ) { for(auto& t : oldLogData[i].tLogs) { - for( int j = 0; j < t.logServers.size(); j++ ) { - oldLogs.push_back(std::make_pair(t.logServers[j]->get().id(), t.logServers[j]->get().present() ? t.logServers[j]->get().interf().address() : NetworkAddress())); + for( int j = 0; j < t->logServers.size(); j++ ) { + oldLogs.push_back(std::make_pair(t->logServers[j]->get().id(), t->logServers[j]->get().present() ? t->logServers[j]->get().interf().address() : NetworkAddress())); } } } @@ -490,14 +507,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> changes; changes.push_back(Never()); for(auto& t : tLogs) { - for( int i = 0; i < t.logServers.size(); i++ ) { - changes.push_back( t.logServers[i]->onChange() ); + for( int i = 0; i < t->logServers.size(); i++ ) { + changes.push_back( t->logServers[i]->onChange() ); } } for( int i = 0; i < oldLogData.size(); i++ ) { for(auto& t : oldLogData[i].tLogs) { - for( int j = 0; j < t.logServers.size(); j++ ) { - changes.push_back( t.logServers[j]->onChange() ); + for( int j = 0; j < t->logServers.size(); j++ ) { + changes.push_back( t->logServers[j]->onChange() ); } } } @@ -518,20 +535,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted originalTags, std::vector& tags ) { - tLogs[logSet].getPushLocations(originalTags, tags, SERVER_KNOBS->MAX_TAG); + tLogs[logSet]->getPushLocations(originalTags, tags, SERVER_KNOBS->MAX_TAG); } virtual void getPushLocations( std::vector const& tags, std::vector& locations ) { int locationOffset = 0; for(auto& log : tLogs) { - if(log.isLocal) { - log.getPushLocations(tags, locations, locationOffset); - locationOffset += log.logServers.size(); + if(log->isLocal) { + log->getPushLocations(tags, locations, locationOffset); + locationOffset += log->logServers.size(); } } } virtual Tag getRandomRouterTag() { + ASSERT(minRouters < 1e6); return SERVER_KNOBS->MIN_TAG - g_random->randomInt(0, minRouters); } @@ -570,55 +588,61 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted>> tLogReply; state std::vector>>> allLogServers; - state std::vector logServers; + state std::vector> logServers; state std::vector oldLogData; state std::vector>>> logFailed; state std::vector> failureTrackers; - for( auto& log : prevState.tLogs ) { - LogSet logSet; + + logServers.resize(prevState.tLogs.size()); + for( int i = 0; i < prevState.tLogs.size(); i++ ) { + Reference logSet = Reference( new LogSet() ); + logServers[i] = logSet; + CoreTLogSet const& coreSet = prevState.tLogs[i]; std::vector>> failed; - for(int i = 0; i < log.tLogs.size(); i++) { - Reference>> logVar = Reference>>( new AsyncVar>( OptionalInterface(log.tLogs[i]) ) ); - logSet.logServers.push_back( logVar ); + for(int j = 0; j < coreSet.tLogs.size(); j++) { + Reference>> logVar = Reference>>( new AsyncVar>( OptionalInterface(coreSet.tLogs[j]) ) ); + logSet->logServers.push_back( logVar ); allLogServers.push_back( logVar ); failed.push_back( Reference>( new AsyncVar() ) ); - failureTrackers.push_back( monitorLog(logVar, failed[i] ) ); + failureTrackers.push_back( monitorLog(logVar, failed[j] ) ); } - logSet.tLogReplicationFactor = log.tLogReplicationFactor; - logSet.tLogWriteAntiQuorum = log.tLogWriteAntiQuorum; - logSet.tLogPolicy = log.tLogPolicy; - logSet.tLogLocalities = log.tLogLocalities; - logSet.isLocal = log.isLocal; - logSet.hasBest = log.hasBest; - logServers.push_back(logSet); + logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor; + logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum; + logSet->tLogPolicy = coreSet.tLogPolicy; + logSet->tLogLocalities = coreSet.tLogLocalities; + logSet->isLocal = coreSet.isLocal; + logSet->hasBest = coreSet.hasBest; logFailed.push_back(failed); } - for( auto& old : prevState.oldTLogData ) { - OldLogData oldData; - for( auto& log : old.tLogs) { - LogSet logSet; - for(int j = 0; j < log.tLogs.size(); j++) { - Reference>> logVar = Reference>>( new AsyncVar>( OptionalInterface(log.tLogs[j]) ) ); - logSet.logServers.push_back( logVar ); + oldLogData.resize(prevState.oldTLogData.size()); + for( int i = 0; i < prevState.oldTLogData.size(); i++ ) { + OldLogData& oldData = oldLogData[i]; + OldTLogCoreData const& old = prevState.oldTLogData[i]; + oldData.tLogs.resize(old.tLogs.size()); + for( int j = 0; j < old.tLogs.size(); j++ ) { + Reference logSet = Reference( new LogSet() ); + oldData.tLogs[j] = logSet; + CoreTLogSet const& log = old.tLogs[j]; + for(int k = 0; k < log.tLogs.size(); k++) { + Reference>> logVar = Reference>>( new AsyncVar>( OptionalInterface(log.tLogs[k]) ) ); + logSet->logServers.push_back( logVar ); allLogServers.push_back( logVar ); } - logSet.tLogReplicationFactor = log.tLogReplicationFactor; - logSet.tLogWriteAntiQuorum = log.tLogWriteAntiQuorum; - logSet.tLogPolicy = log.tLogPolicy; - logSet.tLogLocalities = log.tLogLocalities; - logSet.isLocal = log.isLocal; - logSet.hasBest = log.hasBest; - oldData.tLogs.push_back(logSet); + logSet->tLogReplicationFactor = log.tLogReplicationFactor; + logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum; + logSet->tLogPolicy = log.tLogPolicy; + logSet->tLogLocalities = log.tLogLocalities; + logSet->isLocal = log.isLocal; + logSet->hasBest = log.hasBest; } oldData.epochEnd = old.epochEnd; - oldLogData.push_back(oldData); } state Future rejoins = trackRejoins( dbgid, allLogServers, rejoinRequests ); tLogReply.resize(logServers.size()); for( int i=0; i < logServers.size(); i++ ) { - for(int t=0; tlogServers.size(); t++) { + tLogReply[i].push_back( lockTLog( dbgid, logServers[i]->logServers[t]) ); } } @@ -629,7 +653,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted end; Version knownCommittedVersion = 0; for(int log = 0; log < logServers.size(); log++) { - if(!logServers[log].isLocal) { + if(!logServers[log]->isLocal) { continue; } @@ -637,10 +661,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted 0 && requiredCount <= logServers[log].logServers.size() ); - ASSERT( logServers[log].tLogReplicationFactor >= 1 && logServers[log].tLogReplicationFactor <= logServers[log].logServers.size() ); - ASSERT( logServers[log].tLogWriteAntiQuorum >= 0 && logServers[log].tLogWriteAntiQuorum < logServers[log].logServers.size() ); + state int requiredCount = (int)logServers[log]->logServers.size()+1 - logServers[log]->tLogReplicationFactor + logServers[log]->tLogWriteAntiQuorum; + ASSERT( requiredCount > 0 && requiredCount <= logServers[log]->logServers.size() ); + ASSERT( logServers[log]->tLogReplicationFactor >= 1 && logServers[log]->tLogReplicationFactor <= logServers[log]->logServers.size() ); + ASSERT( logServers[log]->tLogWriteAntiQuorum >= 0 && logServers[log]->tLogWriteAntiQuorum < logServers[log]->logServers.size() ); std::vector availableItems, badCombo; std::vector results; @@ -649,52 +673,52 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.size(); t++) { if (tLogReply[log][t].isReady() && !tLogReply[log][t].isError() && !logFailed[log][t]->get()) { results.push_back(tLogReply[log][t].get()); - availableItems.push_back(logServers[log].tLogLocalities[t]); + availableItems.push_back(logServers[log]->tLogLocalities[t]); sServerState += 'a'; } else { - unResponsiveSet.add(logServers[log].tLogLocalities[t]); + unResponsiveSet.add(logServers[log]->tLogLocalities[t]); sServerState += 'f'; } } // Check if the list of results is not larger than the anti quorum - bool bTooManyFailures = (results.size() <= logServers[log].tLogWriteAntiQuorum); + bool bTooManyFailures = (results.size() <= logServers[log]->tLogWriteAntiQuorum); // Check if failed logs complete the policy - bTooManyFailures = bTooManyFailures || ((unResponsiveSet.size() >= logServers[log].tLogReplicationFactor) && (unResponsiveSet.validate(logServers[log].tLogPolicy))); + bTooManyFailures = bTooManyFailures || ((unResponsiveSet.size() >= logServers[log]->tLogReplicationFactor) && (unResponsiveSet.validate(logServers[log]->tLogPolicy))); // Check all combinations of the AntiQuorum within the failed - if ((!bTooManyFailures) && (logServers[log].tLogWriteAntiQuorum) && (!validateAllCombinations(badCombo, unResponsiveSet, logServers[log].tLogPolicy, availableItems, logServers[log].tLogWriteAntiQuorum, false))) { + if ((!bTooManyFailures) && (logServers[log]->tLogWriteAntiQuorum) && (!validateAllCombinations(badCombo, unResponsiveSet, logServers[log]->tLogPolicy, availableItems, logServers[log]->tLogWriteAntiQuorum, false))) { TraceEvent("EpochEndBadCombo", dbgid).detail("Cycles", cycles) .detail("logNum", log) .detail("Required", requiredCount) .detail("Present", results.size()) .detail("Available", availableItems.size()) - .detail("Absent", logServers[log].logServers.size() - results.size()) + .detail("Absent", logServers[log]->logServers.size() - results.size()) .detail("ServerState", sServerState) - .detail("ReplicationFactor", logServers[log].tLogReplicationFactor) - .detail("AntiQuorum", logServers[log].tLogWriteAntiQuorum) - .detail("Policy", logServers[log].tLogPolicy->info()) + .detail("ReplicationFactor", logServers[log]->tLogReplicationFactor) + .detail("AntiQuorum", logServers[log]->tLogWriteAntiQuorum) + .detail("Policy", logServers[log]->tLogPolicy->info()) .detail("TooManyFailures", bTooManyFailures) - .detail("LogZones", ::describeZones(logServers[log].tLogLocalities)) - .detail("LogDataHalls", ::describeDataHalls(logServers[log].tLogLocalities)); + .detail("LogZones", ::describeZones(logServers[log]->tLogLocalities)) + .detail("LogDataHalls", ::describeDataHalls(logServers[log]->tLogLocalities)); bTooManyFailures = true; } // If too many TLogs are failed for recovery to be possible, we could wait forever here. //Void _ = wait( smartQuorum( tLogReply, requiredCount, SERVER_KNOBS->RECOVERY_TLOG_SMART_QUORUM_DELAY ) || rejoins ); - ASSERT(logServers[log].logServers.size() == tLogReply[log].size()); + ASSERT(logServers[log]->logServers.size() == tLogReply[log].size()); if (!bTooManyFailures) { std::sort( results.begin(), results.end(), sort_by_end() ); - int absent = logServers[log].logServers.size() - results.size(); - int safe_range_begin = logServers[log].tLogWriteAntiQuorum; - int new_safe_range_begin = std::min(logServers[log].tLogWriteAntiQuorum, (int)(results.size()-1)); - int safe_range_end = logServers[log].tLogReplicationFactor - absent; + int absent = logServers[log]->logServers.size() - results.size(); + int safe_range_begin = logServers[log]->tLogWriteAntiQuorum; + int new_safe_range_begin = std::min(logServers[log]->tLogWriteAntiQuorum, (int)(results.size()-1)); + int safe_range_end = logServers[log]->tLogReplicationFactor - absent; if( ( prevState.logSystemType == 2 && (!last_end.present() || ((safe_range_end > 0) && (safe_range_end-1 < results.size()) && results[ safe_range_end-1 ].end < last_end.get())) ) ) { knownCommittedVersion = std::max(knownCommittedVersion, end.get() - (g_network->isSimulated() ? 10*SERVER_KNOBS->VERSIONS_PER_SECOND : SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)); //In simulation this must be the maximum MAX_READ_TRANSACTION_LIFE_VERSIONS @@ -704,15 +728,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.size()) .detail("Required", requiredCount) .detail("Present", results.size()) .detail("Available", availableItems.size()) - .detail("Absent", logServers[log].logServers.size() - results.size()) + .detail("Absent", logServers[log]->logServers.size() - results.size()) .detail("ServerState", sServerState) - .detail("ReplicationFactor", logServers[log].tLogReplicationFactor) - .detail("AntiQuorum", logServers[log].tLogWriteAntiQuorum) - .detail("Policy", logServers[log].tLogPolicy->info()) + .detail("ReplicationFactor", logServers[log]->tLogReplicationFactor) + .detail("AntiQuorum", logServers[log]->tLogWriteAntiQuorum) + .detail("Policy", logServers[log]->tLogPolicy->info()) .detail("TooManyFailures", bTooManyFailures) .detail("LastVersion", (last_end.present()) ? last_end.get() : -1L) .detail("RecoveryVersion", ((safe_range_end > 0) && (safe_range_end-1 < results.size())) ? results[ safe_range_end-1 ].end : -1) @@ -720,8 +744,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities)) + .detail("LogDataHalls", ::describeDataHalls(logServers[log]->tLogLocalities)) .detail("tLogs", (int)prevState.tLogs.size()) .detail("oldTlogsSize", (int)prevState.oldTLogData.size()) .detail("logSystemType", prevState.logSystemType) @@ -734,14 +758,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.size()) .detail("Required", requiredCount) .detail("Present", results.size()) .detail("Available", availableItems.size()) .detail("ServerState", sServerState) - .detail("ReplicationFactor", logServers[log].tLogReplicationFactor) - .detail("AntiQuorum", logServers[log].tLogWriteAntiQuorum) - .detail("Policy", logServers[log].tLogPolicy->info()) + .detail("ReplicationFactor", logServers[log]->tLogReplicationFactor) + .detail("AntiQuorum", logServers[log]->tLogWriteAntiQuorum) + .detail("Policy", logServers[log]->tLogPolicy->info()) .detail("TooManyFailures", bTooManyFailures) .detail("LastVersion", (last_end.present()) ? last_end.get() : -1L) .detail("RecoveryVersion", ((safe_range_end > 0) && (safe_range_end-1 < results.size())) ? results[ safe_range_end-1 ].end : -1) @@ -749,8 +773,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogLocalities)) + .detail("LogDataHalls", ::describeDataHalls(logServers[log]->tLogLocalities)); } } // Too many failures @@ -759,17 +783,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.size()) .detail("Required", requiredCount) .detail("Present", results.size()) .detail("Available", availableItems.size()) .detail("ServerState", sServerState) - .detail("ReplicationFactor", logServers[log].tLogReplicationFactor) - .detail("AntiQuorum", logServers[log].tLogWriteAntiQuorum) - .detail("Policy", logServers[log].tLogPolicy->info()) + .detail("ReplicationFactor", logServers[log]->tLogReplicationFactor) + .detail("AntiQuorum", logServers[log]->tLogWriteAntiQuorum) + .detail("Policy", logServers[log]->tLogPolicy->info()) .detail("TooManyFailures", bTooManyFailures) - .detail("LogZones", ::describeZones(logServers[log].tLogLocalities)) - .detail("LogDataHalls", ::describeDataHalls(logServers[log].tLogLocalities)); + .detail("LogZones", ::describeZones(logServers[log]->tLogLocalities)) + .detail("LogDataHalls", ::describeDataHalls(logServers[log]->tLogLocalities)); } } @@ -800,12 +824,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> changes; for(int i=0; i < logServers.size(); i++) { - if(logServers[i].isLocal) { - for(int j=0; j < logServers[i].logServers.size(); j++) { + if(logServers[i]->isLocal) { + for(int j=0; j < logServers[i]->logServers.size(); j++) { if (!tLogReply[i][j].isReady()) changes.push_back( ready(tLogReply[i][j]) ); else { - changes.push_back( logServers[i].logServers[j]->onChange() ); + changes.push_back( logServers[i]->logServers[j]->onChange() ); changes.push_back( logFailed[i][j]->onChange() ); } } @@ -832,7 +856,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[logNum].logRouters.push_back( Reference>>( new AsyncVar>( OptionalInterface(logRouterInitializationReplies[i].get()) ) ) ); + self->tLogs[logNum]->logRouters.push_back( Reference>>( new AsyncVar>( OptionalInterface(logRouterInitializationReplies[i].get()) ) ) ); } self->logSystemConfigChanges.trigger(); @@ -853,14 +877,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedMAX_TAG + i; } - self->tLogs[logNum].tLogLocalities.resize( remoteTLogWorkers.size() ); - self->tLogs[logNum].logServers.resize( remoteTLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size - self->tLogs[logNum].updateLocalitySet(remoteTLogWorkers); + self->tLogs[logNum]->tLogLocalities.resize( remoteTLogWorkers.size() ); + self->tLogs[logNum]->logServers.resize( remoteTLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size + self->tLogs[logNum]->updateLocalitySet(remoteTLogWorkers); vector locations; for( Tag tag : self->epochEndTags ) { locations.clear(); - self->tLogs[logNum].getPushLocations( vector(1, tag), locations, 0 ); + self->tLogs[logNum]->getPushLocations( vector(1, tag), locations, 0 ); for(int loc : locations) remoteTLogReqs[ loc ].recoverTags.push_back( tag ); } @@ -871,16 +895,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[logNum].logServers.push_back( Reference>>( new AsyncVar>( OptionalInterface(remoteTLogInitializationReplies[i].get()) ) ) ); + self->tLogs[logNum]->logServers[i] = Reference>>( new AsyncVar>( OptionalInterface(remoteTLogInitializationReplies[i].get()) ) ); + self->tLogs[logNum]->tLogLocalities[i] = remoteTLogWorkers[i].locality; } std::vector> recoveryComplete; - for( int i = 0; i < self->tLogs[logNum].logServers.size(); i++) - recoveryComplete.push_back( transformErrors( throwErrorOr( self->tLogs[logNum].logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + for( int i = 0; i < self->tLogs[logNum]->logServers.size(); i++) + recoveryComplete.push_back( transformErrors( throwErrorOr( self->tLogs[logNum]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); Void _ = wait( waitForAll(recoveryComplete) ); - self->tLogs[logNum].logRouters.resize(logRouterWorkers.size()); + self->tLogs[logNum]->logRouters.resize(logRouterWorkers.size()); return Void(); } @@ -894,12 +919,18 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogSystemType = 2; logSystem->tLogs.resize(2); - logSystem->tLogs[0].tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum; - logSystem->tLogs[0].tLogReplicationFactor = configuration.tLogReplicationFactor; - logSystem->tLogs[0].tLogPolicy = configuration.tLogPolicy; + logSystem->tLogs[0] = Reference( new LogSet() ); + logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum; + logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor; + logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy; + logSystem->tLogs[0]->isLocal = true; + logSystem->tLogs[0]->hasBest = true; - logSystem->tLogs[1].tLogReplicationFactor = configuration.remoteTLogReplicationFactor; - logSystem->tLogs[1].tLogPolicy = configuration.remoteTLogPolicy; + logSystem->tLogs[1] = Reference( new LogSet() ); + logSystem->tLogs[1]->tLogReplicationFactor = configuration.remoteTLogReplicationFactor; + logSystem->tLogs[1]->tLogPolicy = configuration.remoteTLogPolicy; + logSystem->tLogs[1]->isLocal = false; + logSystem->tLogs[1]->hasBest = true; if(oldLogSystem->tLogs.size()) { logSystem->oldLogData.push_back(OldLogData()); @@ -924,16 +955,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[0].tLogLocalities.resize( tLogWorkers.size() ); - logSystem->tLogs[0].logServers.resize( tLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size - logSystem->tLogs[0].updateLocalitySet(tLogWorkers); + logSystem->tLogs[0]->tLogLocalities.resize( tLogWorkers.size() ); + logSystem->tLogs[0]->logServers.resize( tLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size + logSystem->tLogs[0]->updateLocalitySet(tLogWorkers); std::vector locations; state Tag minTag = 0; for( Tag tag : oldLogSystem->getEpochEndTags() ) { minTag = std::min(minTag, tag); locations.clear(); - logSystem->tLogs[0].getPushLocations( vector(1, tag), locations, 0 ); + logSystem->tLogs[0]->getPushLocations( vector(1, tag), locations, 0 ); for(int loc : locations) reqs[ loc ].recoverTags.push_back( tag ); } @@ -944,16 +975,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[0].logServers[i] = Reference>>( new AsyncVar>( OptionalInterface(initializationReplies[i].get()) ) ); - logSystem->tLogs[0].tLogLocalities[i] = tLogWorkers[i].locality; + logSystem->tLogs[0]->logServers[i] = Reference>>( new AsyncVar>( OptionalInterface(initializationReplies[i].get()) ) ); + logSystem->tLogs[0]->tLogLocalities[i] = tLogWorkers[i].locality; } //Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation) throw master_recovery_failed(); std::vector> recoveryComplete; - for( int i = 0; i < logSystem->tLogs[0].logServers.size(); i++) - recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[0].logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); + for( int i = 0; i < logSystem->tLogs[0]->logServers.size(); i++) + recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[0]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) ); logSystem->recoveryComplete = waitForAll(recoveryComplete); logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), remoteTLogWorkers, logRouterWorkers, configuration, recoveryCount, oldLogSystem->epochEndVersion.get(), minTag, 1);