Fix issues configuring from sharded txs tag to not

Which is an intermingling of what should be two commits:

1. Rely on TLogVersion instead of txsTags==0

2. Copy and index sharded txsTags between KCV and RV as txsTag when
configuring log_version 4->3.
This commit is contained in:
Alex Miller 2019-07-17 01:25:09 -07:00
parent 95487861be
commit e9684a1f63
3 changed files with 30 additions and 20 deletions

View File

@ -831,7 +831,11 @@ void commitMessages( TLogData* self, Reference<LogData> logData, Version version
tag.id = tag.id % logData->logRouterTags;
}
if(tag.locality == tagLocalityTxs) {
tag.id = tag.id % logData->txsTags;
if (logData->txsTags > 0) {
tag.id = tag.id % logData->txsTags;
} else {
tag = txsTag;
}
}
Reference<LogData::TagData> tagData = logData->getTagData(tag);
if(!tagData) {

View File

@ -1081,7 +1081,13 @@ void commitMessages( TLogData* self, Reference<LogData> logData, Version version
tag.id = tag.id % logData->logRouterTags;
}
if(tag.locality == tagLocalityTxs) {
tag.id = tag.id % logData->txsTags;
// TODO: When TLogVersion is threaded into the TLog, this should be replaced with a >V4 check,
// as txsTag is improperly correspondingly overridden in TLog recruitment requests.
if (logData->txsTags > 0) {
tag.id = tag.id % logData->txsTags;
} else {
tag = txsTag;
}
}
Reference<LogData::TagData> tagData = logData->getTagData(tag);
if(!tagData) {

View File

@ -267,6 +267,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->expectedLogSets = lsConf.expectedLogSets;
logSystem->logRouterTags = lsConf.logRouterTags;
logSystem->txsTags = lsConf.txsTags;
ASSERT( !(lsConf.txsTags > 0 && lsConf.tLogs[0].tLogVersion < TLogVersion::V4) );
logSystem->recruitmentID = lsConf.recruitmentID;
logSystem->stopped = lsConf.stopped;
if(useRecoveredAt) {
@ -752,10 +753,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TraceEvent("TLogPeekTxs", dbgid).detail("Begin", begin).detail("End", end).detail("LocalEnd", localEnd).detail("PeekLocality", peekLocality);
int maxTxsTags = txsTags;
bool needsOldTxs = txsTags==0;
bool needsOldTxs = tLogs[0]->tLogVersion < TLogVersion::V4;
for(auto& it : oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.txsTags);
needsOldTxs = needsOldTxs || it.txsTags==0;
needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4;
}
@ -1002,7 +1003,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
virtual void popTxs( Version upTo, int8_t popLocality ) {
if(txsTags == 0) {
if( getTLogVersion() < TLogVersion::V4 ) {
pop(upTo, txsTag, 0, popLocality);
} else {
for(int i = 0; i < txsTags; i++) {
@ -1230,9 +1231,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
virtual Tag getRandomTxsTag() const {
if(txsTags==0) {
return txsTag;
}
return Tag(tagLocalityTxs, deterministicRandom()->randomInt(0, txsTags));
}
@ -1844,6 +1842,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state vector<Future<TLogInterface>> remoteTLogInitializationReplies;
vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() );
bool nonShardedTxs = self->getTLogVersion() < TLogVersion::V4;
if(oldLogSystem->logRouterTags == 0) {
std::vector<int> locations;
for( Tag tag : localTags ) {
@ -1855,20 +1854,20 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(oldLogSystem->tLogs.size()) {
int maxTxsTags = oldLogSystem->txsTags;
bool needsOldTxs = oldLogSystem->txsTags==0;
bool needsOldTxs = oldLogSystem->tLogs[0]->tLogVersion < TLogVersion::V4;
for(auto& it : oldLogSystem->oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.txsTags);
needsOldTxs = needsOldTxs || it.txsTags==0;
needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4;
}
for(int i = needsOldTxs?-1:0; i < maxTxsTags; i++) {
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || self->txsTags==0) ? txsTag : Tag(tagLocalityTxs, i%self->txsTags);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%self->txsTags);
locations.clear();
logSet->getPushLocations( vector<Tag>(1, pushTag), locations, 0 );
logSet->getPushLocations( {pushTag}, locations, 0 );
for(int loc : locations)
remoteTLogReqs[ loc ].recoverTags.push_back( tag );
}
if(self->txsTags == 0) {
if(nonShardedTxs) {
localTags.push_back(txsTag);
} else {
for(int i = 0; i < self->txsTags; i++) {
@ -1932,7 +1931,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->recoveredAt = oldLogSystem->recoverAt;
logSystem->repopulateRegionAntiQuorum = configuration.repopulateRegionAntiQuorum;
logSystem->recruitmentID = deterministicRandom()->randomUniqueID();
logSystem->txsTags = recr.tLogs.size();
logSystem->txsTags = configuration.tLogVersion >= TLogVersion::V4 ? recr.tLogs.size() : 0;
oldLogSystem->recruitmentID = logSystem->recruitmentID;
if(configuration.usableRegions > 1) {
@ -1952,10 +1951,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state RegionInfo region = configuration.getRegion(recr.dcId);
state int maxTxsTags = oldLogSystem->txsTags;
state bool needsOldTxs = oldLogSystem->txsTags==0;
state bool needsOldTxs = oldLogSystem->tLogs.size() && oldLogSystem->getTLogVersion() < TLogVersion::V4;
for(auto& it : oldLogSystem->oldLogData) {
maxTxsTags = std::max<int>(maxTxsTags, it.txsTags);
needsOldTxs = needsOldTxs || it.txsTags==0;
needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4;
}
if(region.satelliteTLogReplicationFactor > 0) {
@ -2063,16 +2062,17 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
Tag tag = Tag(tagLocalityLogRouter, i);
reqs[ logSystem->tLogs[0]->bestLocationFor( tag ) ].recoverTags.push_back( tag );
}
bool nonShardedTxs = logSystem->getTLogVersion() < TLogVersion::V4;
if(oldLogSystem->tLogs.size()) {
for(int i = needsOldTxs?-1:0; i < maxTxsTags; i++) {
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || logSystem->txsTags==0) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags);
locations.clear();
logSystem->tLogs[0]->getPushLocations( vector<Tag>(1, pushTag), locations, 0 );
for(int loc : locations)
reqs[ loc ].recoverTags.push_back( tag );
}
if(logSystem->txsTags == 0) {
if(nonShardedTxs) {
localTags.push_back(txsTag);
} else {
for(int i = 0; i < logSystem->txsTags; i++) {
@ -2128,13 +2128,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(oldLogSystem->tLogs.size()) {
for(int i = needsOldTxs?-1:0; i < maxTxsTags; i++) {
Tag tag = i==-1 ? txsTag : Tag(tagLocalityTxs, i);
Tag pushTag = (i==-1 || logSystem->txsTags==0) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags);
Tag pushTag = (i==-1 || nonShardedTxs) ? txsTag : Tag(tagLocalityTxs, i%logSystem->txsTags);
locations.clear();
logSystem->tLogs[1]->getPushLocations( {pushTag}, locations, 0 );
for(int loc : locations)
sreqs[ loc ].recoverTags.push_back( tag );
}
if(logSystem->txsTags == 0) {
if(nonShardedTxs) {
satelliteTags.push_back(txsTag);
} else {
for(int i = 0; i < logSystem->txsTags; i++) {