Storage server tags cannot change once used in the keyServer location map
This commit is contained in:
parent
9ccc417a4f
commit
8166e1af8b
|
@ -49,16 +49,31 @@ const Value keyServersValue( Standalone<RangeResultRef> result, const std::vecto
|
|||
std::vector<Tag> srcTag;
|
||||
std::vector<Tag> destTag;
|
||||
|
||||
bool foundOldLocality = false;
|
||||
for (const KeyValueRef kv : result) {
|
||||
UID uid = decodeServerTagKey(kv.key);
|
||||
if (std::find(src.begin(), src.end(), uid) != src.end()) {
|
||||
srcTag.push_back( decodeServerTagValue(kv.value) );
|
||||
if(srcTag.back().locality == tagLocalityUpgraded) {
|
||||
foundOldLocality = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (std::find(dest.begin(), dest.end(), uid) != dest.end()) {
|
||||
destTag.push_back( decodeServerTagValue(kv.value) );
|
||||
if(destTag.back().locality == tagLocalityUpgraded) {
|
||||
foundOldLocality = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(foundOldLocality || src.size() != srcTag.size() || dest.size() != destTag.size()) {
|
||||
ASSERT_WE_THINK(foundOldLocality);
|
||||
BinaryWriter wr(IncludeVersion()); wr << src << dest;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
return keyServersValue(srcTag, destTag);
|
||||
}
|
||||
const Value keyServersValue( const std::vector<Tag>& srcTag, const std::vector<Tag>& destTag ) {
|
||||
|
@ -106,6 +121,21 @@ void decodeKeyServersValue( Standalone<RangeResultRef> result, const ValueRef& v
|
|||
}
|
||||
std::sort(src.begin(), src.end());
|
||||
std::sort(dest.begin(), dest.end());
|
||||
if(src.size() != srcTag.size() || dest.size() != destTag.size()) {
|
||||
TraceEvent(SevError, "AttemptedToDecodeMissingTag");
|
||||
for (const KeyValueRef kv : result) {
|
||||
Tag tag = decodeServerTagValue(kv.value);
|
||||
UID serverID = decodeServerTagKey(kv.key);
|
||||
TraceEvent("TagUIDMap").detail("Tag", tag.toString()).detail("UID", serverID.toString());
|
||||
}
|
||||
for(auto& it : srcTag) {
|
||||
TraceEvent("SrcTag").detail("Tag", it.toString());
|
||||
}
|
||||
for(auto& it : destTag) {
|
||||
TraceEvent("DestTag").detail("Tag", it.toString());
|
||||
}
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
||||
const KeyRangeRef conflictingKeysRange = KeyRangeRef(LiteralStringRef("\xff\xff/transaction/conflicting_keys/"),
|
||||
|
|
|
@ -1646,10 +1646,12 @@ ACTOR static Future<Void> rejoinServer( MasterProxyInterface proxy, ProxyCommitD
|
|||
rep.history.push_back(std::make_pair(decodeServerTagHistoryKey(history[i].key), decodeServerTagValue(history[i].value)));
|
||||
}
|
||||
auto localityKey = commitData->txnStateStore->readValue(tagLocalityListKeyFor(req.dcId)).get();
|
||||
rep.newLocality = false;
|
||||
if( localityKey.present() ) {
|
||||
rep.newLocality = false;
|
||||
int8_t locality = decodeTagLocalityListValue(localityKey.get());
|
||||
if(locality != rep.tag.locality) {
|
||||
if(rep.tag.locality != tagLocalityUpgraded && locality != rep.tag.locality) {
|
||||
TraceEvent(SevWarnAlways, "SSRejoinedWithChangedLocality").detail("Tag", rep.tag.toString()).detail("DcId", req.dcId).detail("NewLocality", locality);
|
||||
} else if(locality != rep.tag.locality) {
|
||||
uint16_t tagId = 0;
|
||||
std::vector<uint16_t> usedTags;
|
||||
auto tagKeys = commitData->txnStateStore->readRange(serverTagKeys).get();
|
||||
|
@ -1678,6 +1680,8 @@ ACTOR static Future<Void> rejoinServer( MasterProxyInterface proxy, ProxyCommitD
|
|||
}
|
||||
rep.newTag = Tag(locality, tagId);
|
||||
}
|
||||
} else if(rep.tag.locality != tagLocalityUpgraded) {
|
||||
TraceEvent(SevWarnAlways, "SSRejoinedWithUnknownLocality").detail("Tag", rep.tag.toString()).detail("DcId", req.dcId);
|
||||
} else {
|
||||
rep.newLocality = true;
|
||||
int8_t maxTagLocality = -1;
|
||||
|
|
Loading…
Reference in New Issue