made LogSet reference counted,

fixed a few bugs
This commit is contained in:
Evan Tschannen 2017-07-11 15:48:10 -07:00
parent 81ae263ad9
commit 415458deef
3 changed files with 281 additions and 244 deletions

View File

@ -38,7 +38,7 @@ void uniquify( Collection& c ) {
c.resize( std::unique(c.begin(), c.end()) - c.begin() );
}
class LogSet {
class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
public:
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logServers;
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters;
@ -327,7 +327,7 @@ struct ILogSystem {
};
struct SetPeekCursor : IPeekCursor, ReferenceCounted<SetPeekCursor> {
std::vector<LogSet> logSets;
std::vector<Reference<LogSet>> logSets;
std::vector< std::vector< Reference<IPeekCursor> > > serverCursors;
Tag tag;
int bestSet, bestServer, currentSet, currentCursor;
@ -339,7 +339,7 @@ struct ILogSystem {
bool useBestSet;
UID randomID;
SetPeekCursor( std::vector<LogSet> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore );
SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore );
virtual Reference<IPeekCursor> cloneNoMore();
@ -532,7 +532,13 @@ struct LogPushData : NonCopyable {
// Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version
explicit LogPushData(Reference<ILogSystem> logSystem) : logSystem(logSystem), subsequence(1) {
tags.resize( logSystem->getLogSystemConfig().tLogs.size() );
int totalSize = 0;
for(auto& log : logSystem->getLogSystemConfig().tLogs) {
if(log.isLocal) {
totalSize += log.tLogs.size();
}
}
tags.resize( totalSize );
for(int i = 0; i < tags.size(); i++) {
messagesWriter.push_back( BinaryWriter( AssumeVersion(currentProtocolVersion) ) );
}

View File

@ -411,13 +411,13 @@ Version ILogSystem::MergedPeekCursor::popped() {
return poppedVersion;
}
ILogSystem::SetPeekCursor::SetPeekCursor( std::vector<LogSet> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore )
ILogSystem::SetPeekCursor::SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore )
: logSets(logSets), bestSet(bestSet), bestServer(bestServer), tag(tag), currentCursor(0), currentSet(bestSet), hasNextMessage(false), messageVersion(begin), useBestSet(true), randomID(g_random->randomUniqueID()) {
serverCursors.resize(logSets.size());
int maxServers = 0;
for( int i = 0; i < logSets.size(); i++ ) {
for( int j = 0; j < logSets[i].logServers.size(); j++) {
Reference<ILogSystem::ServerPeekCursor> cursor( new ILogSystem::ServerPeekCursor( logSets[i].logServers[j], tag, begin, end, true, parallelGetMore ) );
for( int j = 0; j < logSets[i]->logServers.size(); j++) {
Reference<ILogSystem::ServerPeekCursor> cursor( new ILogSystem::ServerPeekCursor( logSets[i]->logServers[j], tag, begin, end, true, parallelGetMore ) );
serverCursors[i].push_back( cursor );
}
maxServers = std::max<int>(maxServers, serverCursors[i].size());
@ -506,18 +506,18 @@ void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) {
std::sort(sortedVersions.begin(), sortedVersions.end());
for(auto sortedVersion : sortedVersions) {
auto& locality = logSets[logIdx].tLogLocalities[sortedVersion.second];
auto& locality = logSets[logIdx]->tLogLocalities[sortedVersion.second];
localityGroup.add(locality);
if( localityGroup.size() >= logSets[logIdx].tLogReplicationFactor && localityGroup.validate(logSets[logIdx].tLogPolicy) ) {
if( localityGroup.size() >= logSets[logIdx]->tLogReplicationFactor && localityGroup.validate(logSets[logIdx]->tLogPolicy) ) {
messageVersion = sortedVersion.first;
break;
}
}
} else {
//(int)oldLogData[i].logServers.size() + 1 - oldLogData[i].tLogReplicationFactor
std::nth_element(sortedVersions.begin(), sortedVersions.end()-(logSets[logIdx].logServers.size()+1-logSets[logIdx].tLogReplicationFactor), sortedVersions.end());
messageVersion = sortedVersions[sortedVersions.size()-(logSets[logIdx].logServers.size()+1-logSets[logIdx].tLogReplicationFactor)].first;
std::nth_element(sortedVersions.begin(), sortedVersions.end()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor), sortedVersions.end());
messageVersion = sortedVersions[sortedVersions.size()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor)].first;
}
for(int i = 0; i < serverCursors[logIdx].size(); i++) {
@ -586,10 +586,10 @@ ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer
self->localityGroup.clear();
for( int i = 0; i < self->serverCursors[self->bestSet].size(); i++) {
if(!self->serverCursors[self->bestSet][i]->isActive()) {
self->localityGroup.add(self->logSets[self->bestSet].tLogLocalities[i]);
self->localityGroup.add(self->logSets[self->bestSet]->tLogLocalities[i]);
}
}
bestSetValid = self->localityGroup.size() < self->logSets[self->bestSet].tLogReplicationFactor || !self->localityGroup.validate(self->logSets[self->bestSet].tLogPolicy);
bestSetValid = self->localityGroup.size() < self->logSets[self->bestSet]->tLogReplicationFactor || !self->localityGroup.validate(self->logSets[self->bestSet]->tLogPolicy);
}
if(bestSetValid) {
vector<Future<Void>> q;

View File

@ -43,7 +43,7 @@ ACTOR static Future<Void> reportTLogCommitErrors( Future<Void> commitReply, UID
}
struct OldLogData {
std::vector<LogSet> tLogs;
std::vector<Reference<LogSet>> tLogs;
Version epochEnd;
OldLogData() : epochEnd(0) {}
@ -52,7 +52,7 @@ struct OldLogData {
struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogSystem> {
UID dbgid;
int logSystemType;
std::vector<LogSet> tLogs;
std::vector<Reference<LogSet>> tLogs;
int minRouters;
// new members
@ -88,8 +88,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::string result;
for( int i = 0; i < tLogs.size(); i++ ) {
result = format("%d: ", i);
for( int j = 0; j < tLogs[i].logServers.size(); j++) {
result = result + tLogs[i].logServers[j]->get().id().toString() + ((j == tLogs[i].logServers.size() - 1) ? " " : ", ");
for( int j = 0; j < tLogs[i]->logServers.size(); j++) {
result = result + tLogs[i]->logServers[j]->get().id().toString() + ((j == tLogs[i]->logServers.size() - 1) ? " " : ", ");
}
}
return result;
@ -108,43 +108,48 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
//ASSERT(lsConf.epoch == epoch); //< FIXME
Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(dbgid, locality) );
for( auto& it : lsConf.tLogs ) {
LogSet logSet;
for( auto& log : it.tLogs) {
logSet.logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
logSystem->tLogs.resize(lsConf.tLogs.size());
for( int i = 0; i < lsConf.tLogs.size(); i++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->tLogs[i] = logSet;
TLogSet const& tLogSet = lsConf.tLogs[i];
for( auto& log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto& log : it.logRouters) {
logSet.logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
for( auto& log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet.tLogWriteAntiQuorum = it.tLogWriteAntiQuorum;
logSet.tLogReplicationFactor = it.tLogReplicationFactor;
logSet.tLogPolicy = it.tLogPolicy;
logSet.tLogLocalities = it.tLogLocalities;
logSet.isLocal = it.isLocal;
logSet.hasBest = it.hasBest;
logSet.updateLocalitySet();
logSystem->tLogs.push_back(logSet);
if(logSet.logRouters.size() > 0) logSystem->minRouters = std::min<int>(logSystem->minRouters, logSet.logRouters.size());
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBest = tLogSet.hasBest;
logSet->updateLocalitySet();
if(logSet->logRouters.size() > 0) logSystem->minRouters = std::min<int>(logSystem->minRouters, logSet->logRouters.size());
}
logSystem->oldLogData.resize(lsConf.oldTLogs.size());
for( int i = 0; i < lsConf.oldTLogs.size(); i++ ) {
for( auto& it : lsConf.oldTLogs[i].tLogs ) {
LogSet logSet;
for( auto & log : it.tLogs) {
logSet.logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
logSystem->oldLogData[i].tLogs.resize(lsConf.oldTLogs[i].tLogs.size());
for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->oldLogData[i].tLogs[j] = logSet;
TLogSet const& tLogData = lsConf.oldTLogs[i].tLogs[j];
for( auto & log : tLogData.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : it.logRouters) {
logSet.logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
for( auto & log : tLogData.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet.tLogWriteAntiQuorum = it.tLogWriteAntiQuorum;
logSet.tLogReplicationFactor = it.tLogReplicationFactor;
logSet.tLogPolicy = it.tLogPolicy;
logSet.tLogLocalities = it.tLogLocalities;
logSet.isLocal = it.isLocal;
logSet.hasBest = it.hasBest;
logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor;
logSet->tLogPolicy = tLogData.tLogPolicy;
logSet->tLogLocalities = tLogData.tLogLocalities;
logSet->isLocal = tLogData.isLocal;
logSet->hasBest = tLogData.hasBest;
//logSet.UpdateLocalitySet(); we do not update the locality set, since we never push to old logs
logSystem->oldLogData[i].tLogs.push_back(logSet);
}
logSystem->oldLogData[i].epochEnd = lsConf.oldTLogs[i].epochEnd;
}
@ -159,44 +164,48 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(dbgid, locality) );
if(lsConf.oldTLogs.size()) {
for( auto& it : lsConf.oldTLogs[0].tLogs ) {
LogSet logSet;
for( auto & log : it.tLogs) {
logSet.logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
logSystem->tLogs.resize( lsConf.oldTLogs[0].tLogs.size());
for( int i = 0; i < lsConf.oldTLogs[0].tLogs.size(); i++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->tLogs[i] = logSet;
TLogSet const& tLogSet = lsConf.oldTLogs[0].tLogs[i];
for( auto & log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : it.logRouters) {
logSet.logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
for( auto & log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet.tLogWriteAntiQuorum = it.tLogWriteAntiQuorum;
logSet.tLogReplicationFactor = it.tLogReplicationFactor;
logSet.tLogPolicy = it.tLogPolicy;
logSet.tLogLocalities = it.tLogLocalities;
logSet.isLocal = it.isLocal;
logSet.hasBest = it.hasBest;
//logSet.updateLocalitySet(); we do not update the locality set, since we never push to old logs
logSystem->tLogs.push_back(logSet);
if(logSet.logRouters.size() > 0) logSystem->minRouters = std::min<int>(logSystem->minRouters, logSet.logRouters.size());
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBest = tLogSet.hasBest;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
if(logSet->logRouters.size() > 0) logSystem->minRouters = std::min<int>(logSystem->minRouters, logSet->logRouters.size());
}
//logSystem->epochEnd = lsConf.oldTLogs[0].epochEnd;
logSystem->oldLogData.resize(lsConf.oldTLogs.size()-1);
for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) {
for( auto& it : lsConf.oldTLogs[i].tLogs ) {
LogSet logSet;
for( auto & log : it.tLogs) {
logSet.logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
logSystem->oldLogData[i-1].tLogs.resize(lsConf.oldTLogs[i].tLogs.size());
for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->oldLogData[i-1].tLogs[j] = logSet;
TLogSet const& tLogSet = lsConf.oldTLogs[i].tLogs[j];
for( auto & log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : it.logRouters) {
logSet.logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
for( auto & log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet.tLogWriteAntiQuorum = it.tLogWriteAntiQuorum;
logSet.tLogReplicationFactor = it.tLogReplicationFactor;
logSet.tLogPolicy = it.tLogPolicy;
logSet.tLogLocalities = it.tLogLocalities;
logSet.isLocal = it.isLocal;
logSet.hasBest = it.hasBest;
//logSet.updateLocalitySet(); we do not update the locality set, since we never push to old logs
logSystem->oldLogData[i-1].tLogs.push_back(logSet);
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->hasBest = tLogSet.hasBest;
//logSet->updateLocalitySet(); we do not update the locality set, since we never push to old logs
}
logSystem->oldLogData[i-1].epochEnd = lsConf.oldTLogs[i].epochEnd;
}
@ -213,15 +222,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
newState.tLogs.clear();
for(auto &t : tLogs) {
CoreTLogSet coreSet;
for(auto &log : t.logServers) {
for(auto &log : t->logServers) {
coreSet.tLogs.push_back(log->get().id());
coreSet.tLogLocalities.push_back(log->get().interf().locality);
}
coreSet.tLogWriteAntiQuorum = t.tLogWriteAntiQuorum;
coreSet.tLogReplicationFactor = t.tLogReplicationFactor;
coreSet.tLogPolicy = t.tLogPolicy;
coreSet.isLocal = t.isLocal;
coreSet.hasBest = t.hasBest;
coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum;
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.hasBest = t->hasBest;
newState.tLogs.push_back(coreSet);
}
@ -231,15 +240,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(int i = 0; i < oldLogData.size(); i++) {
for(auto &t : oldLogData[i].tLogs) {
CoreTLogSet coreSet;
for(auto &log : t.logServers) {
for(auto &log : t->logServers) {
coreSet.tLogs.push_back(log->get().id());
}
coreSet.tLogLocalities = t.tLogLocalities;
coreSet.tLogWriteAntiQuorum = t.tLogWriteAntiQuorum;
coreSet.tLogReplicationFactor = t.tLogReplicationFactor;
coreSet.tLogPolicy = t.tLogPolicy;
coreSet.isLocal = t.isLocal;
coreSet.hasBest = t.hasBest;
coreSet.tLogLocalities = t->tLogLocalities;
coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum;
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.hasBest = t->hasBest;
newState.oldTLogData[i].tLogs.push_back(coreSet);
}
newState.oldTLogData[i].epochEnd = oldLogData[i].epochEnd;
@ -267,12 +276,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
vector<Future<Void>> failed;
for( auto& it : tLogs ) {
for(auto &t : it.logServers) {
for(auto &t : it->logServers) {
if( t->get().present() ) {
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
}
}
for(auto &t : it.logRouters) {
for(auto &t : it->logRouters) {
if( t->get().present() ) {
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
}
@ -287,19 +296,19 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// FIXME: Randomize request order as in LegacyLogSystem?
vector<Future<Void>> quorumResults;
int location = 0;
for(auto it : tLogs) {
if(it.isLocal) {
for(auto& it : tLogs) {
if(it->isLocal) {
vector<Future<Void>> tLogCommitResults;
for(int loc=0; loc< it.logServers.size(); loc++) {
for(int loc=0; loc< it->logServers.size(); loc++) {
Future<Void> commitMessage = reportTLogCommitErrors(
it.logServers[loc]->get().interf().commit.getReply(
it->logServers[loc]->get().interf().commit.getReply(
TLogCommitRequest( data.getArena(), prevVersion, version, knownCommittedVersion, data.getMessages(location), data.getTags(location), debugID ), TaskTLogCommitReply ),
getDebugID());
actors.add(commitMessage);
tLogCommitResults.push_back(commitMessage);
location++;
}
quorumResults.push_back( quorum( tLogCommitResults, tLogCommitResults.size() - it.tLogWriteAntiQuorum ) );
quorumResults.push_back( quorum( tLogCommitResults, tLogCommitResults.size() - it->tLogWriteAntiQuorum ) );
}
}
@ -307,18 +316,22 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
virtual Reference<IPeekCursor> peek( Version begin, Tag tag, bool parallelGetMore ) {
if(tLogs.size() < 2) {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
if(tag >= SERVER_KNOBS->MAX_TAG) {
//FIXME: non-static logRouters
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[1].logRouters, -1, (int)tLogs[1].logRouters.size(), tag, begin, getPeekEnd(), false ) );
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[1]->logRouters, -1, (int)tLogs[1]->logRouters.size(), tag, begin, getPeekEnd(), false ) );
} else {
if(oldLogData.size() == 0 || begin >= oldLogData[0].epochEnd) {
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( tLogs, 1, tLogs[1].logServers.size() ? tLogs[1].bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) );
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( tLogs, 1, tLogs[1]->logServers.size() ? tLogs[1]->bestLocationFor( tag ) : -1, tag, begin, getPeekEnd(), parallelGetMore ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( tLogs, 1, tLogs[1].logServers.size() ? tLogs[1].bestLocationFor( tag ) : -1, tag, oldLogData[0].epochEnd, getPeekEnd(), parallelGetMore)) );
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( tLogs, 1, tLogs[1]->logServers.size() ? tLogs[1]->bestLocationFor( tag ) : -1, tag, oldLogData[0].epochEnd, getPeekEnd(), parallelGetMore)) );
for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) {
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1].logServers.size() ? oldLogData[i].tLogs[1].bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, parallelGetMore)) );
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1]->logServers.size() ? oldLogData[i].tLogs[1]->bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, parallelGetMore)) );
epochEnds.push_back(LogMessageVersion(oldLogData[i].epochEnd));
}
@ -329,19 +342,23 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
virtual Reference<IPeekCursor> peekSingle( Version begin, Tag tag ) {
ASSERT(tag < SERVER_KNOBS->MAX_TAG);
if(tLogs.size() < 2) {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
if(oldLogData.size() == 0 || begin >= oldLogData[0].epochEnd) {
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[1].logServers.size() ?
tLogs[1].logServers[tLogs[1].bestLocationFor( tag )] :
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[1]->logServers.size() ?
tLogs[1]->logServers[tLogs[1]->bestLocationFor( tag )] :
Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
} else {
TEST(true); //peekSingle used during non-copying tlog recovery
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
cursors.push_back( Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[1].logServers.size() ?
tLogs[1].logServers[tLogs[1].bestLocationFor( tag )] :
cursors.push_back( Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[1]->logServers.size() ?
tLogs[1]->logServers[tLogs[1]->bestLocationFor( tag )] :
Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, oldLogData[0].epochEnd, getPeekEnd(), false, false) ) );
for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) {
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1].logServers.size() ? oldLogData[i].tLogs[1].bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, false)) );
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( oldLogData[i].tLogs, 1, oldLogData[i].tLogs[1]->logServers.size() ? oldLogData[i].tLogs[1]->bestLocationFor( tag ) : -1, tag, i+1 == oldLogData.size() ? begin : std::max(oldLogData[i+1].epochEnd, begin), oldLogData[i].epochEnd, false)) );
epochEnds.push_back(LogMessageVersion(oldLogData[i].epochEnd));
}
@ -352,7 +369,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
virtual void pop( Version upTo, Tag tag ) {
if (!upTo) return;
for(auto& t : tLogs) {
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>>& popServers = tag >= SERVER_KNOBS->MAX_TAG ? t.logRouters : t.logServers;
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>>& popServers = tag >= SERVER_KNOBS->MAX_TAG ? t->logRouters : t->logServers;
for(auto& log : popServers) {
Version prev = outstandingPops[std::make_pair(log->get().id(),tag)];
if (prev < upTo)
@ -394,13 +411,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// FIXME: This is way too conservative?
vector<Future<Void>> quorumResults;
for(auto& it : tLogs) {
if(it.isLocal) {
if(it->isLocal) {
vector<Future<Void>> alive;
for(auto& t : it.logServers) {
for(auto& t : it->logServers) {
if( t->get().present() ) alive.push_back( brokenPromiseToNever( t->get().interf().confirmRunning.getReply(TLogConfirmRunningRequest(debugID), TaskTLogConfirmRunningReply ) ) );
else alive.push_back( Never() );
}
quorumResults.push_back( quorum( alive, alive.size() - it.tLogWriteAntiQuorum ) );
quorumResults.push_back( quorum( alive, alive.size() - it->tLogWriteAntiQuorum ) );
}
}
@ -417,48 +434,48 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
LogSystemConfig logSystemConfig;
logSystemConfig.logSystemType = logSystemType;
for( auto& t : tLogs ) {
TLogSet log;
log.tLogWriteAntiQuorum = t.tLogWriteAntiQuorum;
log.tLogReplicationFactor = t.tLogReplicationFactor;
log.tLogPolicy = t.tLogPolicy;
log.tLogLocalities = t.tLogLocalities;
log.isLocal = t.isLocal;
log.hasBest = t.hasBest;
logSystemConfig.tLogs.resize(tLogs.size());
for( int i = 0; i < tLogs.size(); i++ ) {
TLogSet& log = logSystemConfig.tLogs[i];
Reference<LogSet> logSet = tLogs[i];
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBest = logSet->hasBest;
for( int i = 0; i < t.logServers.size(); i++ ) {
log.tLogs.push_back(t.logServers[i]->get());
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
}
for( int i = 0; i < t.logRouters.size(); i++ ) {
log.logRouters.push_back(t.logRouters[i]->get());
for( int i = 0; i < logSet->logRouters.size(); i++ ) {
log.logRouters.push_back(logSet->logRouters[i]->get());
}
logSystemConfig.tLogs.push_back(log);
}
if(!recoveryCompleteWrittenToCoreState) {
for( int i = 0; i < oldLogData.size(); i++ ) {
logSystemConfig.oldTLogs.push_back(OldTLogConf());
for( auto& t : oldLogData[i].tLogs ) {
TLogSet log;
log.tLogWriteAntiQuorum = t.tLogWriteAntiQuorum;
log.tLogReplicationFactor = t.tLogReplicationFactor;
log.tLogPolicy = t.tLogPolicy;
log.tLogLocalities = t.tLogLocalities;
log.isLocal = t.isLocal;
log.hasBest = t.hasBest;
logSystemConfig.oldTLogs[i].tLogs.resize(oldLogData[i].tLogs.size());
for( int j = 0; j < oldLogData[i].tLogs.size(); j++ ) {
TLogSet& log = logSystemConfig.oldTLogs[i].tLogs[j];
Reference<LogSet> logSet = oldLogData[i].tLogs[j];
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.hasBest = logSet->hasBest;
for( int i = 0; i < t.logServers.size(); i++ ) {
log.tLogs.push_back(t.logServers[i]->get());
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
}
for( int i = 0; i < t.logRouters.size(); i++ ) {
log.logRouters.push_back(t.logRouters[i]->get());
for( int i = 0; i < logSet->logRouters.size(); i++ ) {
log.logRouters.push_back(logSet->logRouters[i]->get());
}
logSystemConfig.oldTLogs[i].tLogs.push_back(log);
}
logSystemConfig.oldTLogs[i].epochEnd = oldLogData[i].epochEnd;
}
@ -470,15 +487,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
vector<std::pair<UID, NetworkAddress>> logs;
vector<std::pair<UID, NetworkAddress>> oldLogs;
for(auto& t : tLogs) {
for( int i = 0; i < t.logServers.size(); i++ ) {
logs.push_back(std::make_pair(t.logServers[i]->get().id(), t.logServers[i]->get().present() ? t.logServers[i]->get().interf().address() : NetworkAddress()));
for( int i = 0; i < t->logServers.size(); i++ ) {
logs.push_back(std::make_pair(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress()));
}
}
if(!recoveryCompleteWrittenToCoreState) {
for( int i = 0; i < oldLogData.size(); i++ ) {
for(auto& t : oldLogData[i].tLogs) {
for( int j = 0; j < t.logServers.size(); j++ ) {
oldLogs.push_back(std::make_pair(t.logServers[j]->get().id(), t.logServers[j]->get().present() ? t.logServers[j]->get().interf().address() : NetworkAddress()));
for( int j = 0; j < t->logServers.size(); j++ ) {
oldLogs.push_back(std::make_pair(t->logServers[j]->get().id(), t->logServers[j]->get().present() ? t->logServers[j]->get().interf().address() : NetworkAddress()));
}
}
}
@ -490,14 +507,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<Future<Void>> changes;
changes.push_back(Never());
for(auto& t : tLogs) {
for( int i = 0; i < t.logServers.size(); i++ ) {
changes.push_back( t.logServers[i]->onChange() );
for( int i = 0; i < t->logServers.size(); i++ ) {
changes.push_back( t->logServers[i]->onChange() );
}
}
for( int i = 0; i < oldLogData.size(); i++ ) {
for(auto& t : oldLogData[i].tLogs) {
for( int j = 0; j < t.logServers.size(); j++ ) {
changes.push_back( t.logServers[j]->onChange() );
for( int j = 0; j < t->logServers.size(); j++ ) {
changes.push_back( t->logServers[j]->onChange() );
}
}
}
@ -518,20 +535,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
virtual void addRemoteTags( int logSet, std::vector<Tag> originalTags, std::vector<int>& tags ) {
tLogs[logSet].getPushLocations(originalTags, tags, SERVER_KNOBS->MAX_TAG);
tLogs[logSet]->getPushLocations(originalTags, tags, SERVER_KNOBS->MAX_TAG);
}
virtual void getPushLocations( std::vector<Tag> const& tags, std::vector<int>& locations ) {
int locationOffset = 0;
for(auto& log : tLogs) {
if(log.isLocal) {
log.getPushLocations(tags, locations, locationOffset);
locationOffset += log.logServers.size();
if(log->isLocal) {
log->getPushLocations(tags, locations, locationOffset);
locationOffset += log->logServers.size();
}
}
}
virtual Tag getRandomRouterTag() {
ASSERT(minRouters < 1e6);
return SERVER_KNOBS->MIN_TAG - g_random->randomInt(0, minRouters);
}
@ -570,55 +588,61 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// trackRejoins listens for rejoin requests from the tLogs that we are recovering from, to learn their TLogInterfaces
state std::vector<std::vector<Future<TLogLockResult>>> tLogReply;
state std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> allLogServers;
state std::vector<LogSet> logServers;
state std::vector<Reference<LogSet>> logServers;
state std::vector<OldLogData> oldLogData;
state std::vector<std::vector<Reference<AsyncVar<bool>>>> logFailed;
state std::vector<Future<Void>> failureTrackers;
for( auto& log : prevState.tLogs ) {
LogSet logSet;
logServers.resize(prevState.tLogs.size());
for( int i = 0; i < prevState.tLogs.size(); i++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logServers[i] = logSet;
CoreTLogSet const& coreSet = prevState.tLogs[i];
std::vector<Reference<AsyncVar<bool>>> failed;
for(int i = 0; i < log.tLogs.size(); i++) {
Reference<AsyncVar<OptionalInterface<TLogInterface>>> logVar = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(log.tLogs[i]) ) );
logSet.logServers.push_back( logVar );
for(int j = 0; j < coreSet.tLogs.size(); j++) {
Reference<AsyncVar<OptionalInterface<TLogInterface>>> logVar = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(coreSet.tLogs[j]) ) );
logSet->logServers.push_back( logVar );
allLogServers.push_back( logVar );
failed.push_back( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) );
failureTrackers.push_back( monitorLog(logVar, failed[i] ) );
failureTrackers.push_back( monitorLog(logVar, failed[j] ) );
}
logSet.tLogReplicationFactor = log.tLogReplicationFactor;
logSet.tLogWriteAntiQuorum = log.tLogWriteAntiQuorum;
logSet.tLogPolicy = log.tLogPolicy;
logSet.tLogLocalities = log.tLogLocalities;
logSet.isLocal = log.isLocal;
logSet.hasBest = log.hasBest;
logServers.push_back(logSet);
logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor;
logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum;
logSet->tLogPolicy = coreSet.tLogPolicy;
logSet->tLogLocalities = coreSet.tLogLocalities;
logSet->isLocal = coreSet.isLocal;
logSet->hasBest = coreSet.hasBest;
logFailed.push_back(failed);
}
for( auto& old : prevState.oldTLogData ) {
OldLogData oldData;
for( auto& log : old.tLogs) {
LogSet logSet;
for(int j = 0; j < log.tLogs.size(); j++) {
Reference<AsyncVar<OptionalInterface<TLogInterface>>> logVar = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(log.tLogs[j]) ) );
logSet.logServers.push_back( logVar );
oldLogData.resize(prevState.oldTLogData.size());
for( int i = 0; i < prevState.oldTLogData.size(); i++ ) {
OldLogData& oldData = oldLogData[i];
OldTLogCoreData const& old = prevState.oldTLogData[i];
oldData.tLogs.resize(old.tLogs.size());
for( int j = 0; j < old.tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
oldData.tLogs[j] = logSet;
CoreTLogSet const& log = old.tLogs[j];
for(int k = 0; k < log.tLogs.size(); k++) {
Reference<AsyncVar<OptionalInterface<TLogInterface>>> logVar = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(log.tLogs[k]) ) );
logSet->logServers.push_back( logVar );
allLogServers.push_back( logVar );
}
logSet.tLogReplicationFactor = log.tLogReplicationFactor;
logSet.tLogWriteAntiQuorum = log.tLogWriteAntiQuorum;
logSet.tLogPolicy = log.tLogPolicy;
logSet.tLogLocalities = log.tLogLocalities;
logSet.isLocal = log.isLocal;
logSet.hasBest = log.hasBest;
oldData.tLogs.push_back(logSet);
logSet->tLogReplicationFactor = log.tLogReplicationFactor;
logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum;
logSet->tLogPolicy = log.tLogPolicy;
logSet->tLogLocalities = log.tLogLocalities;
logSet->isLocal = log.isLocal;
logSet->hasBest = log.hasBest;
}
oldData.epochEnd = old.epochEnd;
oldLogData.push_back(oldData);
}
state Future<Void> rejoins = trackRejoins( dbgid, allLogServers, rejoinRequests );
tLogReply.resize(logServers.size());
for( int i=0; i < logServers.size(); i++ ) {
for(int t=0; t<logServers[i].logServers.size(); t++) {
tLogReply[i].push_back( lockTLog( dbgid, logServers[i].logServers[t]) );
for(int t=0; t<logServers[i]->logServers.size(); t++) {
tLogReply[i].push_back( lockTLog( dbgid, logServers[i]->logServers[t]) );
}
}
@ -629,7 +653,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Optional<Version> end;
Version knownCommittedVersion = 0;
for(int log = 0; log < logServers.size(); log++) {
if(!logServers[log].isLocal) {
if(!logServers[log]->isLocal) {
continue;
}
@ -637,10 +661,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// have to be strictly less than the replication factor. Otherwise there could be a replica set consistent entirely of servers that
// are out of date due to not being in the write quorum or unavailable due to not being in the read quorum.
// So (N - W) + (N - R) < F, and optimally (N-W)+(N-R)=F-1. Thus R=2N+1-F-W.
state int requiredCount = (int)logServers[log].logServers.size()+1 - logServers[log].tLogReplicationFactor + logServers[log].tLogWriteAntiQuorum;
ASSERT( requiredCount > 0 && requiredCount <= logServers[log].logServers.size() );
ASSERT( logServers[log].tLogReplicationFactor >= 1 && logServers[log].tLogReplicationFactor <= logServers[log].logServers.size() );
ASSERT( logServers[log].tLogWriteAntiQuorum >= 0 && logServers[log].tLogWriteAntiQuorum < logServers[log].logServers.size() );
state int requiredCount = (int)logServers[log]->logServers.size()+1 - logServers[log]->tLogReplicationFactor + logServers[log]->tLogWriteAntiQuorum;
ASSERT( requiredCount > 0 && requiredCount <= logServers[log]->logServers.size() );
ASSERT( logServers[log]->tLogReplicationFactor >= 1 && logServers[log]->tLogReplicationFactor <= logServers[log]->logServers.size() );
ASSERT( logServers[log]->tLogWriteAntiQuorum >= 0 && logServers[log]->tLogWriteAntiQuorum < logServers[log]->logServers.size() );
std::vector<LocalityData> availableItems, badCombo;
std::vector<TLogLockResult> results;
@ -649,52 +673,52 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
double t = timer();
cycles++;
for(int t=0; t<logServers[log].logServers.size(); t++) {
for(int t=0; t<logServers[log]->logServers.size(); t++) {
if (tLogReply[log][t].isReady() && !tLogReply[log][t].isError() && !logFailed[log][t]->get()) {
results.push_back(tLogReply[log][t].get());
availableItems.push_back(logServers[log].tLogLocalities[t]);
availableItems.push_back(logServers[log]->tLogLocalities[t]);
sServerState += 'a';
}
else {
unResponsiveSet.add(logServers[log].tLogLocalities[t]);
unResponsiveSet.add(logServers[log]->tLogLocalities[t]);
sServerState += 'f';
}
}
// Check if the list of results is not larger than the anti quorum
bool bTooManyFailures = (results.size() <= logServers[log].tLogWriteAntiQuorum);
bool bTooManyFailures = (results.size() <= logServers[log]->tLogWriteAntiQuorum);
// Check if failed logs complete the policy
bTooManyFailures = bTooManyFailures || ((unResponsiveSet.size() >= logServers[log].tLogReplicationFactor) && (unResponsiveSet.validate(logServers[log].tLogPolicy)));
bTooManyFailures = bTooManyFailures || ((unResponsiveSet.size() >= logServers[log]->tLogReplicationFactor) && (unResponsiveSet.validate(logServers[log]->tLogPolicy)));
// Check all combinations of the AntiQuorum within the failed
if ((!bTooManyFailures) && (logServers[log].tLogWriteAntiQuorum) && (!validateAllCombinations(badCombo, unResponsiveSet, logServers[log].tLogPolicy, availableItems, logServers[log].tLogWriteAntiQuorum, false))) {
if ((!bTooManyFailures) && (logServers[log]->tLogWriteAntiQuorum) && (!validateAllCombinations(badCombo, unResponsiveSet, logServers[log]->tLogPolicy, availableItems, logServers[log]->tLogWriteAntiQuorum, false))) {
TraceEvent("EpochEndBadCombo", dbgid).detail("Cycles", cycles)
.detail("logNum", log)
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("Absent", logServers[log].logServers.size() - results.size())
.detail("Absent", logServers[log]->logServers.size() - results.size())
.detail("ServerState", sServerState)
.detail("ReplicationFactor", logServers[log].tLogReplicationFactor)
.detail("AntiQuorum", logServers[log].tLogWriteAntiQuorum)
.detail("Policy", logServers[log].tLogPolicy->info())
.detail("ReplicationFactor", logServers[log]->tLogReplicationFactor)
.detail("AntiQuorum", logServers[log]->tLogWriteAntiQuorum)
.detail("Policy", logServers[log]->tLogPolicy->info())
.detail("TooManyFailures", bTooManyFailures)
.detail("LogZones", ::describeZones(logServers[log].tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(logServers[log].tLogLocalities));
.detail("LogZones", ::describeZones(logServers[log]->tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(logServers[log]->tLogLocalities));
bTooManyFailures = true;
}
// If too many TLogs are failed for recovery to be possible, we could wait forever here.
//Void _ = wait( smartQuorum( tLogReply, requiredCount, SERVER_KNOBS->RECOVERY_TLOG_SMART_QUORUM_DELAY ) || rejoins );
ASSERT(logServers[log].logServers.size() == tLogReply[log].size());
ASSERT(logServers[log]->logServers.size() == tLogReply[log].size());
if (!bTooManyFailures) {
std::sort( results.begin(), results.end(), sort_by_end() );
int absent = logServers[log].logServers.size() - results.size();
int safe_range_begin = logServers[log].tLogWriteAntiQuorum;
int new_safe_range_begin = std::min(logServers[log].tLogWriteAntiQuorum, (int)(results.size()-1));
int safe_range_end = logServers[log].tLogReplicationFactor - absent;
int absent = logServers[log]->logServers.size() - results.size();
int safe_range_begin = logServers[log]->tLogWriteAntiQuorum;
int new_safe_range_begin = std::min(logServers[log]->tLogWriteAntiQuorum, (int)(results.size()-1));
int safe_range_end = logServers[log]->tLogReplicationFactor - absent;
if( ( prevState.logSystemType == 2 && (!last_end.present() || ((safe_range_end > 0) && (safe_range_end-1 < results.size()) && results[ safe_range_end-1 ].end < last_end.get())) ) ) {
knownCommittedVersion = std::max(knownCommittedVersion, end.get() - (g_network->isSimulated() ? 10*SERVER_KNOBS->VERSIONS_PER_SECOND : SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)); //In simulation this must be the maximum MAX_READ_TRANSACTION_LIFE_VERSIONS
@ -704,15 +728,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TraceEvent("LogSystemRecovery", dbgid).detail("Cycles", cycles)
.detail("logNum", log)
.detail("TotalServers", logServers[log].logServers.size())
.detail("TotalServers", logServers[log]->logServers.size())
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("Absent", logServers[log].logServers.size() - results.size())
.detail("Absent", logServers[log]->logServers.size() - results.size())
.detail("ServerState", sServerState)
.detail("ReplicationFactor", logServers[log].tLogReplicationFactor)
.detail("AntiQuorum", logServers[log].tLogWriteAntiQuorum)
.detail("Policy", logServers[log].tLogPolicy->info())
.detail("ReplicationFactor", logServers[log]->tLogReplicationFactor)
.detail("AntiQuorum", logServers[log]->tLogWriteAntiQuorum)
.detail("Policy", logServers[log]->tLogPolicy->info())
.detail("TooManyFailures", bTooManyFailures)
.detail("LastVersion", (last_end.present()) ? last_end.get() : -1L)
.detail("RecoveryVersion", ((safe_range_end > 0) && (safe_range_end-1 < results.size())) ? results[ safe_range_end-1 ].end : -1)
@ -720,8 +744,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
.detail("SafeBegin", safe_range_begin)
.detail("SafeEnd", safe_range_end)
.detail("NewSafeBegin", new_safe_range_begin)
.detail("LogZones", ::describeZones(logServers[log].tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(logServers[log].tLogLocalities))
.detail("LogZones", ::describeZones(logServers[log]->tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(logServers[log]->tLogLocalities))
.detail("tLogs", (int)prevState.tLogs.size())
.detail("oldTlogsSize", (int)prevState.oldTLogData.size())
.detail("logSystemType", prevState.logSystemType)
@ -734,14 +758,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
else {
TraceEvent("LogSystemUnchangedRecovery", dbgid).detail("Cycles", cycles)
.detail("logNum", log)
.detail("TotalServers", logServers[log].logServers.size())
.detail("TotalServers", logServers[log]->logServers.size())
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("ServerState", sServerState)
.detail("ReplicationFactor", logServers[log].tLogReplicationFactor)
.detail("AntiQuorum", logServers[log].tLogWriteAntiQuorum)
.detail("Policy", logServers[log].tLogPolicy->info())
.detail("ReplicationFactor", logServers[log]->tLogReplicationFactor)
.detail("AntiQuorum", logServers[log]->tLogWriteAntiQuorum)
.detail("Policy", logServers[log]->tLogPolicy->info())
.detail("TooManyFailures", bTooManyFailures)
.detail("LastVersion", (last_end.present()) ? last_end.get() : -1L)
.detail("RecoveryVersion", ((safe_range_end > 0) && (safe_range_end-1 < results.size())) ? results[ safe_range_end-1 ].end : -1)
@ -749,8 +773,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
.detail("SafeBegin", safe_range_begin)
.detail("SafeEnd", safe_range_end)
.detail("NewSafeBegin", new_safe_range_begin)
.detail("LogZones", ::describeZones(logServers[log].tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(logServers[log].tLogLocalities));
.detail("LogZones", ::describeZones(logServers[log]->tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(logServers[log]->tLogLocalities));
}
}
// Too many failures
@ -759,17 +783,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
.detail("logNum", log)
.detail("AvailableServers", results.size())
.detail("RequiredServers", requiredCount)
.detail("TotalServers", logServers[log].logServers.size())
.detail("TotalServers", logServers[log]->logServers.size())
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("ServerState", sServerState)
.detail("ReplicationFactor", logServers[log].tLogReplicationFactor)
.detail("AntiQuorum", logServers[log].tLogWriteAntiQuorum)
.detail("Policy", logServers[log].tLogPolicy->info())
.detail("ReplicationFactor", logServers[log]->tLogReplicationFactor)
.detail("AntiQuorum", logServers[log]->tLogWriteAntiQuorum)
.detail("Policy", logServers[log]->tLogPolicy->info())
.detail("TooManyFailures", bTooManyFailures)
.detail("LogZones", ::describeZones(logServers[log].tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(logServers[log].tLogLocalities));
.detail("LogZones", ::describeZones(logServers[log]->tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(logServers[log]->tLogLocalities));
}
}
@ -800,12 +824,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
// Wait for anything relevant to change
std::vector<Future<Void>> changes;
for(int i=0; i < logServers.size(); i++) {
if(logServers[i].isLocal) {
for(int j=0; j < logServers[i].logServers.size(); j++) {
if(logServers[i]->isLocal) {
for(int j=0; j < logServers[i]->logServers.size(); j++) {
if (!tLogReply[i][j].isReady())
changes.push_back( ready(tLogReply[i][j]) );
else {
changes.push_back( logServers[i].logServers[j]->onChange() );
changes.push_back( logServers[i]->logServers[j]->onChange() );
changes.push_back( logFailed[i][j]->onChange() );
}
}
@ -832,7 +856,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Void _ = wait( waitForAll(logRouterInitializationReplies) );
for( int i = 0; i < logRouterInitializationReplies.size(); i++ ) {
self->tLogs[logNum].logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(logRouterInitializationReplies[i].get()) ) ) );
self->tLogs[logNum]->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(logRouterInitializationReplies[i].get()) ) ) );
}
self->logSystemConfigChanges.trigger();
@ -853,14 +877,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.remoteTag = SERVER_KNOBS->MAX_TAG + i;
}
self->tLogs[logNum].tLogLocalities.resize( remoteTLogWorkers.size() );
self->tLogs[logNum].logServers.resize( remoteTLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
self->tLogs[logNum].updateLocalitySet(remoteTLogWorkers);
self->tLogs[logNum]->tLogLocalities.resize( remoteTLogWorkers.size() );
self->tLogs[logNum]->logServers.resize( remoteTLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
self->tLogs[logNum]->updateLocalitySet(remoteTLogWorkers);
vector<int> locations;
for( Tag tag : self->epochEndTags ) {
locations.clear();
self->tLogs[logNum].getPushLocations( vector<Tag>(1, tag), locations, 0 );
self->tLogs[logNum]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
for(int loc : locations)
remoteTLogReqs[ loc ].recoverTags.push_back( tag );
}
@ -871,16 +895,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Void _ = wait( waitForAll(remoteTLogInitializationReplies) );
for( int i = 0; i < remoteTLogInitializationReplies.size(); i++ ) {
self->tLogs[logNum].logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(remoteTLogInitializationReplies[i].get()) ) ) );
self->tLogs[logNum]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(remoteTLogInitializationReplies[i].get()) ) );
self->tLogs[logNum]->tLogLocalities[i] = remoteTLogWorkers[i].locality;
}
std::vector<Future<Void>> recoveryComplete;
for( int i = 0; i < self->tLogs[logNum].logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( self->tLogs[logNum].logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
for( int i = 0; i < self->tLogs[logNum]->logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( self->tLogs[logNum]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
Void _ = wait( waitForAll(recoveryComplete) );
self->tLogs[logNum].logRouters.resize(logRouterWorkers.size());
self->tLogs[logNum]->logRouters.resize(logRouterWorkers.size());
return Void();
}
@ -894,12 +919,18 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->logSystemType = 2;
logSystem->tLogs.resize(2);
logSystem->tLogs[0].tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum;
logSystem->tLogs[0].tLogReplicationFactor = configuration.tLogReplicationFactor;
logSystem->tLogs[0].tLogPolicy = configuration.tLogPolicy;
logSystem->tLogs[0] = Reference<LogSet>( new LogSet() );
logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum;
logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor;
logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy;
logSystem->tLogs[0]->isLocal = true;
logSystem->tLogs[0]->hasBest = true;
logSystem->tLogs[1].tLogReplicationFactor = configuration.remoteTLogReplicationFactor;
logSystem->tLogs[1].tLogPolicy = configuration.remoteTLogPolicy;
logSystem->tLogs[1] = Reference<LogSet>( new LogSet() );
logSystem->tLogs[1]->tLogReplicationFactor = configuration.remoteTLogReplicationFactor;
logSystem->tLogs[1]->tLogPolicy = configuration.remoteTLogPolicy;
logSystem->tLogs[1]->isLocal = false;
logSystem->tLogs[1]->hasBest = true;
if(oldLogSystem->tLogs.size()) {
logSystem->oldLogData.push_back(OldLogData());
@ -924,16 +955,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.epoch = recoveryCount;
}
logSystem->tLogs[0].tLogLocalities.resize( tLogWorkers.size() );
logSystem->tLogs[0].logServers.resize( tLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[0].updateLocalitySet(tLogWorkers);
logSystem->tLogs[0]->tLogLocalities.resize( tLogWorkers.size() );
logSystem->tLogs[0]->logServers.resize( tLogWorkers.size() ); // Dummy interfaces, so that logSystem->getPushLocations() below uses the correct size
logSystem->tLogs[0]->updateLocalitySet(tLogWorkers);
std::vector<int> locations;
state Tag minTag = 0;
for( Tag tag : oldLogSystem->getEpochEndTags() ) {
minTag = std::min(minTag, tag);
locations.clear();
logSystem->tLogs[0].getPushLocations( vector<Tag>(1, tag), locations, 0 );
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
for(int loc : locations)
reqs[ loc ].recoverTags.push_back( tag );
}
@ -944,16 +975,16 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Void _ = wait( waitForAll( initializationReplies ) );
for( int i = 0; i < initializationReplies.size(); i++ ) {
logSystem->tLogs[0].logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) );
logSystem->tLogs[0].tLogLocalities[i] = tLogWorkers[i].locality;
logSystem->tLogs[0]->logServers[i] = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(initializationReplies[i].get()) ) );
logSystem->tLogs[0]->tLogLocalities[i] = tLogWorkers[i].locality;
}
//Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running recoveries causing tests to timeout
if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation) throw master_recovery_failed();
std::vector<Future<Void>> recoveryComplete;
for( int i = 0; i < logSystem->tLogs[0].logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[0].logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
for( int i = 0; i < logSystem->tLogs[0]->logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[0]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
logSystem->recoveryComplete = waitForAll(recoveryComplete);
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), remoteTLogWorkers, logRouterWorkers, configuration, recoveryCount, oldLogSystem->epochEndVersion.get(), minTag, 1);