1191 lines
41 KiB
C++
1191 lines
41 KiB
C++
/*
|
|
* ApplyMetadataMutation.cpp
|
|
*
|
|
* This source file is part of the FoundationDB open source project
|
|
*
|
|
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#include "fdbclient/BackupAgent.actor.h"
|
|
#include "fdbclient/KeyBackedTypes.h" // for key backed map codecs for tss mapping
|
|
#include "fdbclient/MutationList.h"
|
|
#include "fdbclient/Notified.h"
|
|
#include "fdbclient/SystemData.h"
|
|
#include "fdbserver/ApplyMetadataMutation.h"
|
|
#include "fdbserver/IKeyValueStore.h"
|
|
#include "fdbserver/LogProtocolMessage.h"
|
|
#include "fdbserver/LogSystem.h"
|
|
|
|
Reference<StorageInfo> getStorageInfo(UID id,
|
|
std::map<UID, Reference<StorageInfo>>* storageCache,
|
|
IKeyValueStore* txnStateStore) {
|
|
Reference<StorageInfo> storageInfo;
|
|
auto cacheItr = storageCache->find(id);
|
|
if (cacheItr == storageCache->end()) {
|
|
storageInfo = makeReference<StorageInfo>();
|
|
storageInfo->tag = decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(id)).get().get());
|
|
storageInfo->interf = decodeServerListValue(txnStateStore->readValue(serverListKeyFor(id)).get().get());
|
|
(*storageCache)[id] = storageInfo;
|
|
} else {
|
|
storageInfo = cacheItr->second;
|
|
}
|
|
return storageInfo;
|
|
}
|
|
namespace {
|
|
|
|
inline bool isSystemKey(KeyRef key) {
|
|
return key.size() && key[0] == systemKeys.begin[0];
|
|
}
|
|
|
|
// It is incredibly important that any modifications to txnStateStore are done in such a way that the same operations
|
|
// will be done on all commit proxies at the same time. Otherwise, the data stored in txnStateStore will become
|
|
// corrupted.
|
|
class ApplyMetadataMutationsImpl {
|
|
|
|
public:
|
|
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
|
|
const UID& dbgid_,
|
|
Arena& arena_,
|
|
const VectorRef<MutationRef>& mutations_,
|
|
IKeyValueStore* txnStateStore_)
|
|
: spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_),
|
|
confChange(dummyConfChange) {}
|
|
|
|
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
|
|
Arena& arena_,
|
|
const VectorRef<MutationRef>& mutations_,
|
|
ProxyCommitData& proxyCommitData_,
|
|
Reference<ILogSystem> logSystem_,
|
|
LogPushData* toCommit_,
|
|
bool& confChange_,
|
|
Version version,
|
|
Version popVersion_,
|
|
bool initialCommit_)
|
|
: spanContext(spanContext_), dbgid(proxyCommitData_.dbgid), arena(arena_), mutations(mutations_),
|
|
txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), confChange(confChange_),
|
|
logSystem(logSystem_), version(version), popVersion(popVersion_),
|
|
vecBackupKeys(&proxyCommitData_.vecBackupKeys), keyInfo(&proxyCommitData_.keyInfo),
|
|
cacheInfo(&proxyCommitData_.cacheInfo),
|
|
uid_applyMutationsData(proxyCommitData_.firstProxy ? &proxyCommitData_.uid_applyMutationsData : nullptr),
|
|
commit(proxyCommitData_.commit), cx(proxyCommitData_.cx), committedVersion(&proxyCommitData_.committedVersion),
|
|
storageCache(&proxyCommitData_.storageCache), tag_popped(&proxyCommitData_.tag_popped),
|
|
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap),
|
|
initialCommit(initialCommit_) {}
|
|
|
|
private:
|
|
// The following variables are incoming parameters
|
|
|
|
const SpanID& spanContext;
|
|
|
|
const UID& dbgid;
|
|
|
|
Arena& arena;
|
|
|
|
const VectorRef<MutationRef>& mutations;
|
|
|
|
// Transaction KV store
|
|
IKeyValueStore* txnStateStore;
|
|
|
|
// non-null if these mutations were part of a new commit handled by this commit proxy
|
|
LogPushData* toCommit = nullptr;
|
|
|
|
// Flag indicates if the configure is changed
|
|
bool& confChange;
|
|
|
|
Reference<ILogSystem> logSystem = Reference<ILogSystem>();
|
|
Version version = invalidVersion;
|
|
Version popVersion = 0;
|
|
KeyRangeMap<std::set<Key>>* vecBackupKeys = nullptr;
|
|
KeyRangeMap<ServerCacheInfo>* keyInfo = nullptr;
|
|
KeyRangeMap<bool>* cacheInfo = nullptr;
|
|
std::map<Key, ApplyMutationsData>* uid_applyMutationsData = nullptr;
|
|
RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>();
|
|
Database cx = Database();
|
|
NotifiedVersion* committedVersion = nullptr;
|
|
std::map<UID, Reference<StorageInfo>>* storageCache = nullptr;
|
|
std::map<Tag, Version>* tag_popped = nullptr;
|
|
std::unordered_map<UID, StorageServerInterface>* tssMapping = nullptr;
|
|
|
|
std::map<TenantName, TenantMapEntry>* tenantMap = nullptr;
|
|
|
|
// true if the mutations were already written to the txnStateStore as part of recovery
|
|
bool initialCommit = false;
|
|
|
|
private:
|
|
// The following variables are used internally
|
|
|
|
std::map<KeyRef, MutationRef> cachedRangeInfo;
|
|
|
|
// Testing Storage Server removal (clearing serverTagKey) needs to read tss server list value to determine it is a
|
|
// tss + find partner's tag to send the private mutation. Since the removeStorageServer transaction clears both the
|
|
// storage list and server tag, we have to enforce ordering, proccessing the server tag first, and postpone the
|
|
// server list clear until the end;
|
|
std::vector<KeyRangeRef> tssServerListToRemove;
|
|
|
|
// Similar to tssServerListToRemove, the TSS mapping change key needs to read the server list at the end of the
|
|
// commit
|
|
std::vector<std::pair<UID, UID>> tssMappingToAdd;
|
|
|
|
private:
|
|
bool dummyConfChange = false;
|
|
|
|
private:
|
|
void checkSetKeyServersPrefix(MutationRef m) {
|
|
if (!m.param1.startsWith(keyServersPrefix)) {
|
|
return;
|
|
}
|
|
|
|
if (!initialCommit)
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
|
|
if (!keyInfo) {
|
|
return;
|
|
}
|
|
KeyRef k = m.param1.removePrefix(keyServersPrefix);
|
|
if (k == allKeys.end) {
|
|
return;
|
|
}
|
|
|
|
KeyRef end = keyInfo->rangeContaining(k).end();
|
|
KeyRangeRef insertRange(k, end);
|
|
std::vector<UID> src, dest;
|
|
// txnStateStore is always an in-memory KVS, and must always be recovered before
|
|
// applyMetadataMutations is called, so a wait here should never be needed.
|
|
Future<RangeResult> fResult = txnStateStore->readRange(serverTagKeys);
|
|
decodeKeyServersValue(fResult.get(), m.param2, src, dest);
|
|
|
|
ASSERT(storageCache);
|
|
ServerCacheInfo info;
|
|
info.tags.reserve(src.size() + dest.size());
|
|
info.src_info.reserve(src.size());
|
|
info.dest_info.reserve(dest.size());
|
|
|
|
for (const auto& id : src) {
|
|
auto storageInfo = getStorageInfo(id, storageCache, txnStateStore);
|
|
ASSERT(!storageInfo->interf.isTss());
|
|
ASSERT(storageInfo->tag != invalidTag);
|
|
info.tags.push_back(storageInfo->tag);
|
|
info.src_info.push_back(storageInfo);
|
|
}
|
|
for (const auto& id : dest) {
|
|
auto storageInfo = getStorageInfo(id, storageCache, txnStateStore);
|
|
ASSERT(!storageInfo->interf.isTss());
|
|
ASSERT(storageInfo->tag != invalidTag);
|
|
info.tags.push_back(storageInfo->tag);
|
|
info.dest_info.push_back(storageInfo);
|
|
}
|
|
uniquify(info.tags);
|
|
keyInfo->insert(insertRange, info);
|
|
}
|
|
|
|
void checkSetServerKeysPrefix(MutationRef m) {
|
|
if (!m.param1.startsWith(serverKeysPrefix)) {
|
|
return;
|
|
}
|
|
if (toCommit) {
|
|
Tag tag = decodeServerTagValue(
|
|
txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get());
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
TraceEvent(SevDebug, "SendingPrivateMutation", dbgid)
|
|
.detail("Original", m)
|
|
.detail("Privatized", privatized)
|
|
.detail("Server", serverKeysDecodeServer(m.param1))
|
|
.detail("TagKey", serverTagKeyFor(serverKeysDecodeServer(m.param1)))
|
|
.detail("Tag", tag.toString());
|
|
|
|
toCommit->addTag(tag);
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
|
|
void checkSetServerTagsPrefix(MutationRef m) {
|
|
if (!m.param1.startsWith(serverTagPrefix)) {
|
|
return;
|
|
}
|
|
UID id = decodeServerTagKey(m.param1);
|
|
Tag tag = decodeServerTagValue(m.param2);
|
|
|
|
if (toCommit) {
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
TraceEvent("ServerTag", dbgid).detail("Server", id).detail("Tag", tag.toString());
|
|
|
|
toCommit->addTag(tag);
|
|
toCommit->writeTypedMessage(LogProtocolMessage(), true);
|
|
toCommit->addTag(tag);
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
if (!initialCommit) {
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
if (storageCache) {
|
|
auto cacheItr = storageCache->find(id);
|
|
if (cacheItr == storageCache->end()) {
|
|
Reference<StorageInfo> storageInfo = makeReference<StorageInfo>();
|
|
storageInfo->tag = tag;
|
|
Optional<Key> interfKey = txnStateStore->readValue(serverListKeyFor(id)).get();
|
|
if (interfKey.present()) {
|
|
storageInfo->interf = decodeServerListValue(interfKey.get());
|
|
}
|
|
(*storageCache)[id] = storageInfo;
|
|
} else {
|
|
cacheItr->second->tag = tag;
|
|
// These tag vectors will be repopulated by the proxy when it detects their sizes are 0.
|
|
for (auto& it : keyInfo->ranges()) {
|
|
it.value().tags.clear();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void checkSetStorageCachePrefix(MutationRef m) {
|
|
if (!m.param1.startsWith(storageCachePrefix))
|
|
return;
|
|
if (cacheInfo) {
|
|
KeyRef k = m.param1.removePrefix(storageCachePrefix);
|
|
|
|
// Create a private mutation for storage servers
|
|
// This is done to make the storage servers aware of the cached key-ranges
|
|
if (toCommit) {
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
//TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString());
|
|
cachedRangeInfo[k] = privatized;
|
|
}
|
|
if (k != allKeys.end) {
|
|
KeyRef end = cacheInfo->rangeContaining(k).end();
|
|
std::vector<uint16_t> serverIndices;
|
|
decodeStorageCacheValue(m.param2, serverIndices);
|
|
cacheInfo->insert(KeyRangeRef(k, end), serverIndices.size() > 0);
|
|
}
|
|
}
|
|
if (!initialCommit)
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
}
|
|
|
|
void checkSetCacheKeysPrefix(MutationRef m) {
|
|
if (!m.param1.startsWith(cacheKeysPrefix) || toCommit == nullptr) {
|
|
return;
|
|
}
|
|
|
|
// Create a private mutation for cache servers
|
|
// This is done to make the cache servers aware of the cached key-ranges
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
toCommit->addTag(cacheTag);
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
|
|
void checkSetConfigKeys(MutationRef m) {
|
|
if (!m.param1.startsWith(configKeysPrefix) && m.param1 != coordinatorsKey) {
|
|
return;
|
|
}
|
|
if (Optional<StringRef>(m.param2) !=
|
|
txnStateStore->readValue(m.param1)
|
|
.get()
|
|
.castTo<StringRef>()) { // FIXME: Make this check more specific, here or by reading
|
|
// configuration whenever there is a change
|
|
if ((!m.param1.startsWith(excludedServersPrefix) && m.param1 != excludedServersVersionKey) &&
|
|
(!m.param1.startsWith(failedServersPrefix) && m.param1 != failedServersVersionKey) &&
|
|
(!m.param1.startsWith(excludedLocalityPrefix) && m.param1 != excludedLocalityVersionKey) &&
|
|
(!m.param1.startsWith(failedLocalityPrefix) && m.param1 != failedLocalityVersionKey)) {
|
|
auto t = txnStateStore->readValue(m.param1).get();
|
|
TraceEvent("MutationRequiresRestart", dbgid)
|
|
.detail("M", m)
|
|
.detail("PrevValue", t.orDefault("(none)"_sr))
|
|
.detail("ToCommit", toCommit != nullptr);
|
|
confChange = true;
|
|
}
|
|
}
|
|
if (!initialCommit)
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
}
|
|
|
|
void checkSetChangeFeedPrefix(MutationRef m) {
|
|
if (!m.param1.startsWith(changeFeedPrefix)) {
|
|
return;
|
|
}
|
|
if (toCommit && keyInfo) {
|
|
KeyRange r = std::get<0>(decodeChangeFeedValue(m.param2));
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
auto ranges = keyInfo->intersectingRanges(r);
|
|
auto firstRange = ranges.begin();
|
|
++firstRange;
|
|
if (firstRange == ranges.end()) {
|
|
ranges.begin().value().populateTags();
|
|
toCommit->addTags(ranges.begin().value().tags);
|
|
} else {
|
|
std::set<Tag> allSources;
|
|
for (auto r : ranges) {
|
|
r.value().populateTags();
|
|
allSources.insert(r.value().tags.begin(), r.value().tags.end());
|
|
}
|
|
toCommit->addTags(allSources);
|
|
}
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
|
|
void checkSetServerListPrefix(MutationRef m) {
|
|
if (!m.param1.startsWith(serverListPrefix)) {
|
|
return;
|
|
}
|
|
if (!initialCommit) {
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
if (storageCache) {
|
|
UID id = decodeServerListKey(m.param1);
|
|
StorageServerInterface interf = decodeServerListValue(m.param2);
|
|
|
|
auto cacheItr = storageCache->find(id);
|
|
if (cacheItr == storageCache->end()) {
|
|
Reference<StorageInfo> storageInfo = makeReference<StorageInfo>();
|
|
storageInfo->interf = interf;
|
|
Optional<Key> tagKey = txnStateStore->readValue(serverTagKeyFor(id)).get();
|
|
if (tagKey.present()) {
|
|
storageInfo->tag = decodeServerTagValue(tagKey.get());
|
|
}
|
|
(*storageCache)[id] = storageInfo;
|
|
} else {
|
|
cacheItr->second->interf = interf;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void checkSetTSSMappingKeys(MutationRef m) {
|
|
if (!m.param1.startsWith(tssMappingKeys.begin)) {
|
|
return;
|
|
}
|
|
// Normally uses key backed map, so have to use same unpacking code here.
|
|
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
|
|
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
|
|
if (!initialCommit) {
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
}
|
|
if (tssMapping) {
|
|
tssMappingToAdd.push_back(std::pair(ssId, tssId));
|
|
}
|
|
|
|
if (toCommit) {
|
|
// send private mutation to SS that it now has a TSS pair
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
|
|
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
|
|
if (tagV.present()) {
|
|
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
}
|
|
|
|
void checkSetTSSQuarantineKeys(MutationRef m) {
|
|
if (!m.param1.startsWith(tssQuarantineKeys.begin) || initialCommit) {
|
|
return;
|
|
}
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
|
|
if (!toCommit) {
|
|
return;
|
|
}
|
|
UID tssId = decodeTssQuarantineKey(m.param1);
|
|
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get();
|
|
if (!ssiV.present()) {
|
|
return;
|
|
}
|
|
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
|
|
if (!ssi.isTss()) {
|
|
return;
|
|
}
|
|
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
|
if (tagV.present()) {
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
|
|
void checkSetApplyMutationsEndRange(MutationRef m) {
|
|
if (!m.param1.startsWith(applyMutationsEndRange.begin)) {
|
|
return;
|
|
}
|
|
|
|
if (!initialCommit)
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
|
|
if (uid_applyMutationsData == nullptr) {
|
|
return;
|
|
}
|
|
|
|
Key uid = m.param1.removePrefix(applyMutationsEndRange.begin);
|
|
auto& p = (*uid_applyMutationsData)[uid];
|
|
p.endVersion = BinaryReader::fromStringRef<Version>(m.param2, Unversioned());
|
|
if (p.keyVersion == Reference<KeyRangeMap<Version>>())
|
|
p.keyVersion = makeReference<KeyRangeMap<Version>>();
|
|
if (p.worker.isValid() && !p.worker.isReady()) {
|
|
return;
|
|
}
|
|
auto addPrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsAddPrefixRange.begin)).get();
|
|
auto removePrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsRemovePrefixRange.begin)).get();
|
|
auto beginValue = txnStateStore->readValue(uid.withPrefix(applyMutationsBeginRange.begin)).get();
|
|
p.worker = applyMutations(
|
|
cx,
|
|
uid,
|
|
addPrefixValue.present() ? addPrefixValue.get() : Key(),
|
|
removePrefixValue.present() ? removePrefixValue.get() : Key(),
|
|
beginValue.present() ? BinaryReader::fromStringRef<Version>(beginValue.get(), Unversioned()) : 0,
|
|
&p.endVersion,
|
|
commit,
|
|
committedVersion,
|
|
p.keyVersion);
|
|
}
|
|
|
|
void checkSetApplyMutationsKeyVersionMapRange(MutationRef m) {
|
|
if (!m.param1.startsWith(applyMutationsKeyVersionMapRange.begin)) {
|
|
return;
|
|
}
|
|
if (!initialCommit)
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
|
|
if (uid_applyMutationsData == nullptr) {
|
|
return;
|
|
}
|
|
if (m.param1.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)) {
|
|
Key uid = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID));
|
|
Key k = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID));
|
|
auto& p = (*uid_applyMutationsData)[uid];
|
|
if (p.keyVersion == Reference<KeyRangeMap<Version>>())
|
|
p.keyVersion = makeReference<KeyRangeMap<Version>>();
|
|
p.keyVersion->rawInsert(k, BinaryReader::fromStringRef<Version>(m.param2, Unversioned()));
|
|
}
|
|
}
|
|
|
|
void checkSetLogRangesRange(MutationRef m) {
|
|
if (!m.param1.startsWith(logRangesRange.begin)) {
|
|
return;
|
|
}
|
|
if (!initialCommit)
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
if (!vecBackupKeys) {
|
|
return;
|
|
}
|
|
Key logDestination;
|
|
KeyRef logRangeBegin = logRangesDecodeKey(m.param1, nullptr);
|
|
Key logRangeEnd = logRangesDecodeValue(m.param2, &logDestination);
|
|
|
|
// Insert the logDestination into each range of vecBackupKeys overlapping the decoded range
|
|
for (auto& logRange : vecBackupKeys->modify(KeyRangeRef(logRangeBegin, logRangeEnd))) {
|
|
logRange->value().insert(logDestination);
|
|
}
|
|
for (auto& logRange : vecBackupKeys->modify(singleKeyRange(metadataVersionKey))) {
|
|
logRange->value().insert(logDestination);
|
|
}
|
|
|
|
TraceEvent("LogRangeAdd")
|
|
.detail("LogRanges", vecBackupKeys->size())
|
|
.detail("MutationKey", m.param1)
|
|
.detail("LogRangeBegin", logRangeBegin)
|
|
.detail("LogRangeEnd", logRangeEnd);
|
|
}
|
|
|
|
void checkSetGlobalKeys(MutationRef m) {
|
|
if (!m.param1.startsWith(globalKeysPrefix)) {
|
|
return;
|
|
}
|
|
if (!toCommit) {
|
|
return;
|
|
}
|
|
|
|
// Notifies all servers that a Master's server epoch ends
|
|
auto allServers = txnStateStore->readRange(serverTagKeys).get();
|
|
std::set<Tag> allTags;
|
|
|
|
if (m.param1 == killStorageKey) {
|
|
int8_t safeLocality = BinaryReader::fromStringRef<int8_t>(m.param2, Unversioned());
|
|
for (auto& kv : allServers) {
|
|
Tag t = decodeServerTagValue(kv.value);
|
|
if (t.locality != safeLocality) {
|
|
allTags.insert(t);
|
|
}
|
|
}
|
|
} else {
|
|
for (auto& kv : allServers) {
|
|
allTags.insert(decodeServerTagValue(kv.value));
|
|
}
|
|
}
|
|
allTags.insert(cacheTag);
|
|
|
|
if (m.param1 == lastEpochEndKey) {
|
|
toCommit->addTags(allTags);
|
|
toCommit->writeTypedMessage(LogProtocolMessage(), true);
|
|
}
|
|
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
toCommit->addTags(allTags);
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
|
|
// Generates private mutations for the target storage server, instructing it to create a checkpoint.
|
|
void checkSetCheckpointKeys(MutationRef m) {
|
|
if (!m.param1.startsWith(checkpointPrefix)) {
|
|
return;
|
|
}
|
|
if (toCommit) {
|
|
CheckpointMetaData checkpoint = decodeCheckpointValue(m.param2);
|
|
Tag tag = decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(checkpoint.ssID)).get().get());
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
TraceEvent("SendingPrivateMutationCheckpoint", dbgid)
|
|
.detail("Original", m)
|
|
.detail("Privatized", privatized)
|
|
.detail("Server", checkpoint.ssID)
|
|
.detail("TagKey", serverTagKeyFor(checkpoint.ssID))
|
|
.detail("Tag", tag.toString())
|
|
.detail("Checkpoint", checkpoint.toString());
|
|
|
|
toCommit->addTag(tag);
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
|
|
void checkSetOtherKeys(MutationRef m) {
|
|
if (initialCommit)
|
|
return;
|
|
if (m.param1 == databaseLockedKey || m.param1 == metadataVersionKey ||
|
|
m.param1 == mustContainSystemMutationsKey || m.param1.startsWith(applyMutationsBeginRange.begin) ||
|
|
m.param1.startsWith(applyMutationsAddPrefixRange.begin) ||
|
|
m.param1.startsWith(applyMutationsRemovePrefixRange.begin) || m.param1.startsWith(tagLocalityListPrefix) ||
|
|
m.param1.startsWith(serverTagHistoryPrefix) ||
|
|
m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin) || m.param1 == clusterIdKey) {
|
|
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
}
|
|
}
|
|
|
|
void checkSetMinRequiredCommitVersionKey(MutationRef m) {
|
|
if (m.param1 != minRequiredCommitVersionKey) {
|
|
return;
|
|
}
|
|
Version requested = BinaryReader::fromStringRef<Version>(m.param2, Unversioned());
|
|
TraceEvent("MinRequiredCommitVersion", dbgid).detail("Min", requested).detail("Current", popVersion);
|
|
if (!initialCommit)
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
confChange = true;
|
|
TEST(true); // Recovering at a higher version.
|
|
}
|
|
|
|
void checkSetWriteRecoverKey(MutationRef m) {
|
|
if (m.param1 != writeRecoveryKey) {
|
|
return;
|
|
}
|
|
TraceEvent("WriteRecoveryKeySet", dbgid).log();
|
|
if (!initialCommit)
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
TEST(true); // Snapshot created, setting writeRecoveryKey in txnStateStore
|
|
}
|
|
|
|
void checkSetTenantMapPrefix(MutationRef m) {
|
|
if (m.param1.startsWith(tenantMapPrefix)) {
|
|
if (tenantMap) {
|
|
ASSERT(version != invalidVersion);
|
|
Standalone<StringRef> tenantName = m.param1.removePrefix(tenantMapPrefix);
|
|
TenantMapEntry tenantEntry = decodeTenantEntry(m.param2);
|
|
|
|
TraceEvent("CommitProxyInsertTenant", dbgid).detail("Tenant", tenantName).detail("Version", version);
|
|
(*tenantMap)[tenantName] = tenantEntry;
|
|
}
|
|
|
|
if (!initialCommit) {
|
|
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
|
}
|
|
|
|
// For now, this goes to all storage servers.
|
|
// Eventually, we can have each SS store tenants that apply only to the data stored on it.
|
|
if (toCommit) {
|
|
std::set<Tag> allTags;
|
|
auto allServers = txnStateStore->readRange(serverTagKeys).get();
|
|
for (auto& kv : allServers) {
|
|
allTags.insert(decodeServerTagValue(kv.value));
|
|
}
|
|
|
|
toCommit->addTags(allTags);
|
|
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
|
|
TEST(true); // Tenant added to map
|
|
}
|
|
}
|
|
|
|
void checkClearKeyServerKeys(KeyRangeRef range) {
|
|
if (!keyServersKeys.intersects(range)) {
|
|
return;
|
|
}
|
|
KeyRangeRef r = range & keyServersKeys;
|
|
if (keyInfo) {
|
|
KeyRangeRef clearRange(r.begin.removePrefix(keyServersPrefix), r.end.removePrefix(keyServersPrefix));
|
|
keyInfo->insert(clearRange,
|
|
clearRange.begin == StringRef()
|
|
? ServerCacheInfo()
|
|
: keyInfo->rangeContainingKeyBefore(clearRange.begin).value());
|
|
}
|
|
|
|
if (!initialCommit)
|
|
txnStateStore->clear(r);
|
|
}
|
|
|
|
void checkClearConfigKeys(MutationRef m, KeyRangeRef range) {
|
|
if (!configKeys.intersects(range)) {
|
|
return;
|
|
}
|
|
if (!initialCommit)
|
|
txnStateStore->clear(range & configKeys);
|
|
if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range) &&
|
|
!excludedLocalityKeys.contains(range) && !failedLocalityKeys.contains(range)) {
|
|
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m);
|
|
confChange = true;
|
|
}
|
|
}
|
|
|
|
void checkClearServerListKeys(KeyRangeRef range) {
|
|
if (!serverListKeys.intersects(range)) {
|
|
return;
|
|
}
|
|
if (initialCommit) {
|
|
return;
|
|
}
|
|
KeyRangeRef rangeToClear = range & serverListKeys;
|
|
if (rangeToClear.singleKeyRange()) {
|
|
UID id = decodeServerListKey(rangeToClear.begin);
|
|
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(id)).get();
|
|
if (ssiV.present() && decodeServerListValue(ssiV.get()).isTss()) {
|
|
tssServerListToRemove.push_back(rangeToClear);
|
|
} else {
|
|
txnStateStore->clear(rangeToClear);
|
|
}
|
|
} else {
|
|
txnStateStore->clear(rangeToClear);
|
|
}
|
|
}
|
|
|
|
void checkClearTagLocalityListKeys(KeyRangeRef range) {
|
|
if (!tagLocalityListKeys.intersects(range)) {
|
|
return;
|
|
}
|
|
if (initialCommit) {
|
|
return;
|
|
}
|
|
txnStateStore->clear(range & tagLocalityListKeys);
|
|
}
|
|
|
|
void checkClearServerTagKeys(MutationRef m, KeyRangeRef range) {
|
|
if (!serverTagKeys.intersects(range)) {
|
|
return;
|
|
}
|
|
|
|
// Storage server removal always happens in a separate version from any prior writes (or any subsequent
|
|
// reuse of the tag) so we can safely destroy the tag here without any concern about intra-batch
|
|
// ordering
|
|
if (logSystem && popVersion) {
|
|
auto serverKeysCleared =
|
|
txnStateStore->readRange(range & serverTagKeys).get(); // read is expected to be immediately available
|
|
for (auto& kv : serverKeysCleared) {
|
|
Tag tag = decodeServerTagValue(kv.value);
|
|
TraceEvent("ServerTagRemove")
|
|
.detail("PopVersion", popVersion)
|
|
.detail("Tag", tag.toString())
|
|
.detail("Server", decodeServerTagKey(kv.key));
|
|
logSystem->pop(popVersion, decodeServerTagValue(kv.value));
|
|
(*tag_popped)[tag] = popVersion;
|
|
|
|
if (toCommit) {
|
|
MutationRef privatized = m;
|
|
privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena);
|
|
privatized.param2 = keyAfter(kv.key, arena).withPrefix(systemKeys.begin, arena);
|
|
|
|
toCommit->addTag(decodeServerTagValue(kv.value));
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
// Might be a tss removal, which doesn't store a tag there.
|
|
// Chained if is a little verbose, but avoids unnecessary work
|
|
if (toCommit && !initialCommit && !serverKeysCleared.size()) {
|
|
KeyRangeRef maybeTssRange = range & serverTagKeys;
|
|
if (maybeTssRange.singleKeyRange()) {
|
|
UID id = decodeServerTagKey(maybeTssRange.begin);
|
|
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(id)).get();
|
|
|
|
if (ssiV.present()) {
|
|
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
|
|
if (ssi.isTss()) {
|
|
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
|
if (tagV.present()) {
|
|
MutationRef privatized = m;
|
|
privatized.param1 = maybeTssRange.begin.withPrefix(systemKeys.begin, arena);
|
|
privatized.param2 =
|
|
keyAfter(maybeTssRange.begin, arena).withPrefix(systemKeys.begin, arena);
|
|
|
|
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!initialCommit) {
|
|
KeyRangeRef clearRange = range & serverTagKeys;
|
|
txnStateStore->clear(clearRange);
|
|
if (storageCache && clearRange.singleKeyRange()) {
|
|
storageCache->erase(decodeServerTagKey(clearRange.begin));
|
|
}
|
|
}
|
|
}
|
|
|
|
void checkClearServerTagHistoryKeys(KeyRangeRef range) {
|
|
if (!serverTagHistoryKeys.intersects(range)) {
|
|
return;
|
|
}
|
|
// Once a tag has been removed from history we should pop it, since we no longer have a record of the
|
|
// tag once it has been removed from history
|
|
if (logSystem && popVersion) {
|
|
auto serverKeysCleared = txnStateStore->readRange(range & serverTagHistoryKeys)
|
|
.get(); // read is expected to be immediately available
|
|
for (auto& kv : serverKeysCleared) {
|
|
Tag tag = decodeServerTagValue(kv.value);
|
|
TraceEvent("ServerTagHistoryRemove")
|
|
.detail("PopVersion", popVersion)
|
|
.detail("Tag", tag.toString())
|
|
.detail("Version", decodeServerTagHistoryKey(kv.key));
|
|
logSystem->pop(popVersion, tag);
|
|
(*tag_popped)[tag] = popVersion;
|
|
}
|
|
}
|
|
if (!initialCommit)
|
|
txnStateStore->clear(range & serverTagHistoryKeys);
|
|
}
|
|
|
|
void checkClearApplyMutationsEndRange(MutationRef m, KeyRangeRef range) {
|
|
if (!range.intersects(applyMutationsEndRange)) {
|
|
return;
|
|
}
|
|
KeyRangeRef commonEndRange(range & applyMutationsEndRange);
|
|
if (!initialCommit)
|
|
txnStateStore->clear(commonEndRange);
|
|
if (uid_applyMutationsData != nullptr) {
|
|
uid_applyMutationsData->erase(
|
|
uid_applyMutationsData->lower_bound(m.param1.substr(applyMutationsEndRange.begin.size())),
|
|
m.param2 == applyMutationsEndRange.end
|
|
? uid_applyMutationsData->end()
|
|
: uid_applyMutationsData->lower_bound(m.param2.substr(applyMutationsEndRange.begin.size())));
|
|
}
|
|
}
|
|
|
|
void checkClearApplyMutationKeyVersionMapRange(MutationRef m, KeyRangeRef range) {
|
|
if (!range.intersects(applyMutationsKeyVersionMapRange)) {
|
|
return;
|
|
}
|
|
KeyRangeRef commonApplyRange(range & applyMutationsKeyVersionMapRange);
|
|
if (!initialCommit)
|
|
txnStateStore->clear(commonApplyRange);
|
|
if (uid_applyMutationsData == nullptr) {
|
|
return;
|
|
}
|
|
if (m.param1.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID) &&
|
|
m.param2.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)) {
|
|
Key uid = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID));
|
|
Key uid2 = m.param2.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID));
|
|
|
|
if (uid == uid2) {
|
|
auto& p = (*uid_applyMutationsData)[uid];
|
|
if (p.keyVersion == Reference<KeyRangeMap<Version>>())
|
|
p.keyVersion = makeReference<KeyRangeMap<Version>>();
|
|
p.keyVersion->rawErase(
|
|
KeyRangeRef(m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)),
|
|
m.param2.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID))));
|
|
}
|
|
}
|
|
}
|
|
|
|
void checkClearLogRangesRange(KeyRangeRef range) {
|
|
if (!range.intersects(logRangesRange)) {
|
|
return;
|
|
}
|
|
KeyRangeRef commonLogRange(range & logRangesRange);
|
|
|
|
TraceEvent("LogRangeClear")
|
|
.detail("RangeBegin", range.begin)
|
|
.detail("RangeEnd", range.end)
|
|
.detail("IntersectBegin", commonLogRange.begin)
|
|
.detail("IntersectEnd", commonLogRange.end);
|
|
|
|
// Remove the key range from the vector, if defined
|
|
if (vecBackupKeys) {
|
|
KeyRef logKeyBegin;
|
|
Key logKeyEnd, logDestination;
|
|
|
|
// Identify the backup keys being removed
|
|
// read is expected to be immediately available
|
|
auto logRangesAffected = txnStateStore->readRange(commonLogRange).get();
|
|
|
|
TraceEvent("LogRangeClearBegin").detail("AffectedLogRanges", logRangesAffected.size());
|
|
|
|
// Add the backup name to the backup locations that do not have it
|
|
for (auto logRangeAffected : logRangesAffected) {
|
|
// Parse the backup key and name
|
|
logKeyBegin = logRangesDecodeKey(logRangeAffected.key, nullptr);
|
|
|
|
// Decode the log destination and key value
|
|
logKeyEnd = logRangesDecodeValue(logRangeAffected.value, &logDestination);
|
|
|
|
TraceEvent("LogRangeErase")
|
|
.detail("AffectedKey", logRangeAffected.key)
|
|
.detail("AffectedValue", logRangeAffected.value)
|
|
.detail("LogKeyBegin", logKeyBegin)
|
|
.detail("LogKeyEnd", logKeyEnd)
|
|
.detail("LogDestination", logDestination);
|
|
|
|
// Identify the locations to place the backup key
|
|
auto logRanges = vecBackupKeys->modify(KeyRangeRef(logKeyBegin, logKeyEnd));
|
|
|
|
// Remove the log prefix from the ranges which include it
|
|
for (auto logRange : logRanges) {
|
|
auto& logRangeMap = logRange->value();
|
|
|
|
// Remove the backup name from the range
|
|
logRangeMap.erase(logDestination);
|
|
}
|
|
|
|
bool foundKey = false;
|
|
for (auto& it : vecBackupKeys->intersectingRanges(normalKeys)) {
|
|
if (it.value().count(logDestination) > 0) {
|
|
foundKey = true;
|
|
break;
|
|
}
|
|
}
|
|
if (!foundKey) {
|
|
auto logRanges = vecBackupKeys->modify(singleKeyRange(metadataVersionKey));
|
|
for (auto logRange : logRanges) {
|
|
auto& logRangeMap = logRange->value();
|
|
logRangeMap.erase(logDestination);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Coallesce the entire range
|
|
vecBackupKeys->coalesce(allKeys);
|
|
}
|
|
|
|
if (!initialCommit)
|
|
txnStateStore->clear(commonLogRange);
|
|
}
|
|
|
|
void checkClearTssMappingKeys(MutationRef m, KeyRangeRef range) {
|
|
if (!tssMappingKeys.intersects(range)) {
|
|
return;
|
|
}
|
|
KeyRangeRef rangeToClear = range & tssMappingKeys;
|
|
ASSERT(rangeToClear.singleKeyRange());
|
|
|
|
// Normally uses key backed map, so have to use same unpacking code here.
|
|
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
|
|
if (!initialCommit) {
|
|
txnStateStore->clear(rangeToClear);
|
|
}
|
|
|
|
if (tssMapping) {
|
|
tssMapping->erase(ssId);
|
|
}
|
|
|
|
if (!toCommit) {
|
|
return;
|
|
}
|
|
// send private mutation to SS to notify that it no longer has a tss pair
|
|
if (Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); tagV.present()) {
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
|
|
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
|
|
void checkClearTssQuarantineKeys(MutationRef m, KeyRangeRef range) {
|
|
if (!tssQuarantineKeys.intersects(range) || initialCommit) {
|
|
return;
|
|
}
|
|
|
|
KeyRangeRef rangeToClear = range & tssQuarantineKeys;
|
|
ASSERT(rangeToClear.singleKeyRange());
|
|
txnStateStore->clear(rangeToClear);
|
|
|
|
if (!toCommit) {
|
|
return;
|
|
}
|
|
UID tssId = decodeTssQuarantineKey(m.param1);
|
|
if (Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get(); ssiV.present()) {
|
|
if (StorageServerInterface ssi = decodeServerListValue(ssiV.get()); ssi.isTss()) {
|
|
if (Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
|
tagV.present()) {
|
|
|
|
MutationRef privatized = m;
|
|
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
|
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
|
|
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void checkClearTenantMapPrefix(KeyRangeRef range) {
|
|
if (tenantMapKeys.intersects(range)) {
|
|
if (tenantMap) {
|
|
ASSERT(version != invalidVersion);
|
|
|
|
StringRef startTenant = std::max(range.begin, tenantMapPrefix).removePrefix(tenantMapPrefix);
|
|
StringRef endTenant = (range.end.startsWith(tenantMapPrefix) ? range.end : tenantMapKeys.end)
|
|
.removePrefix(tenantMapPrefix);
|
|
|
|
TraceEvent("CommitProxyEraseTenants", dbgid)
|
|
.detail("BeginTenant", startTenant)
|
|
.detail("EndTenant", endTenant)
|
|
.detail("Version", version);
|
|
|
|
auto startItr = tenantMap->lower_bound(startTenant);
|
|
auto endItr = tenantMap->lower_bound(endTenant);
|
|
tenantMap->erase(startItr, endItr);
|
|
}
|
|
|
|
if (!initialCommit) {
|
|
txnStateStore->clear(range);
|
|
}
|
|
|
|
// For now, this goes to all storage servers.
|
|
// Eventually, we can have each SS store tenants that apply only to the data stored on it.
|
|
if (toCommit) {
|
|
std::set<Tag> allTags;
|
|
auto allServers = txnStateStore->readRange(serverTagKeys).get();
|
|
for (auto& kv : allServers) {
|
|
allTags.insert(decodeServerTagValue(kv.value));
|
|
}
|
|
|
|
toCommit->addTags(allTags);
|
|
|
|
MutationRef privatized;
|
|
privatized.type = MutationRef::ClearRange;
|
|
privatized.param1 = range.begin.withPrefix(systemKeys.begin, arena);
|
|
privatized.param2 = range.end.withPrefix(systemKeys.begin, arena);
|
|
toCommit->writeTypedMessage(privatized);
|
|
}
|
|
|
|
TEST(true); // Tenant cleared from map
|
|
}
|
|
}
|
|
|
|
void checkClearMiscRangeKeys(KeyRangeRef range) {
|
|
if (initialCommit) {
|
|
return;
|
|
}
|
|
if (range.contains(coordinatorsKey)) {
|
|
txnStateStore->clear(singleKeyRange(coordinatorsKey));
|
|
}
|
|
if (range.contains(databaseLockedKey)) {
|
|
txnStateStore->clear(singleKeyRange(databaseLockedKey));
|
|
}
|
|
if (range.contains(metadataVersionKey)) {
|
|
txnStateStore->clear(singleKeyRange(metadataVersionKey));
|
|
}
|
|
if (range.contains(mustContainSystemMutationsKey)) {
|
|
txnStateStore->clear(singleKeyRange(mustContainSystemMutationsKey));
|
|
}
|
|
if (range.contains(writeRecoveryKey)) {
|
|
txnStateStore->clear(singleKeyRange(writeRecoveryKey));
|
|
}
|
|
if (range.intersects(testOnlyTxnStateStorePrefixRange)) {
|
|
txnStateStore->clear(range & testOnlyTxnStateStorePrefixRange);
|
|
}
|
|
}
|
|
|
|
// If we accumulated private mutations for cached key-ranges, we also need to
|
|
// tag them with the relevant storage servers. This is done to make the storage
|
|
// servers aware of the cached key-ranges
|
|
// NOTE: we are assuming non-colliding cached key-ranges
|
|
|
|
// TODO Note that, we are currently not handling the case when cached key-ranges move out
|
|
// to different storage servers. This would require some checking when keys in the keyServersPrefix change.
|
|
// For the first implementation, we could just send the entire map to every storage server. Revisit!
|
|
void tagStorageServersForCachedKeyRanges() {
|
|
if (cachedRangeInfo.size() == 0 || !toCommit) {
|
|
return;
|
|
}
|
|
|
|
std::map<KeyRef, MutationRef>::iterator itr;
|
|
KeyRef keyBegin, keyEnd;
|
|
std::vector<uint16_t> serverIndices;
|
|
MutationRef mutationBegin, mutationEnd;
|
|
|
|
for (itr = cachedRangeInfo.begin(); itr != cachedRangeInfo.end(); ++itr) {
|
|
// first figure out the begin and end keys for the cached-range,
|
|
// the begin and end mutations can be in any order
|
|
decodeStorageCacheValue(itr->second.param2, serverIndices);
|
|
// serverIndices count should be greater than zero for beginKey mutations
|
|
if (serverIndices.size() > 0) {
|
|
keyBegin = itr->first;
|
|
mutationBegin = itr->second;
|
|
++itr;
|
|
if (itr != cachedRangeInfo.end()) {
|
|
keyEnd = itr->first;
|
|
mutationEnd = itr->second;
|
|
} else {
|
|
//TraceEvent(SevDebug, "EndKeyNotFound", dbgid).detail("KeyBegin", keyBegin.toString());
|
|
break;
|
|
}
|
|
} else {
|
|
keyEnd = itr->first;
|
|
mutationEnd = itr->second;
|
|
++itr;
|
|
if (itr != cachedRangeInfo.end()) {
|
|
keyBegin = itr->first;
|
|
mutationBegin = itr->second;
|
|
} else {
|
|
//TraceEvent(SevDebug, "BeginKeyNotFound", dbgid).detail("KeyEnd", keyEnd.toString());
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Now get all the storage server tags for the cached key-ranges
|
|
std::set<Tag> allTags;
|
|
auto ranges = keyInfo->intersectingRanges(KeyRangeRef(keyBegin, keyEnd));
|
|
for (auto it : ranges) {
|
|
auto& r = it.value();
|
|
for (auto info : r.src_info) {
|
|
allTags.insert(info->tag);
|
|
}
|
|
for (auto info : r.dest_info) {
|
|
allTags.insert(info->tag);
|
|
}
|
|
}
|
|
|
|
// Add the tags to both begin and end mutations
|
|
toCommit->addTags(allTags);
|
|
toCommit->writeTypedMessage(mutationBegin);
|
|
toCommit->addTags(allTags);
|
|
toCommit->writeTypedMessage(mutationEnd);
|
|
}
|
|
}
|
|
|
|
public:
|
|
void apply() {
|
|
for (auto const& m : mutations) {
|
|
if (toCommit) {
|
|
toCommit->addTransactionInfo(spanContext);
|
|
}
|
|
|
|
if (m.type == MutationRef::SetValue && isSystemKey(m.param1)) {
|
|
checkSetKeyServersPrefix(m);
|
|
checkSetServerKeysPrefix(m);
|
|
checkSetCheckpointKeys(m);
|
|
checkSetServerTagsPrefix(m);
|
|
checkSetStorageCachePrefix(m);
|
|
checkSetCacheKeysPrefix(m);
|
|
checkSetConfigKeys(m);
|
|
checkSetServerListPrefix(m);
|
|
checkSetChangeFeedPrefix(m);
|
|
checkSetTSSMappingKeys(m);
|
|
checkSetTSSQuarantineKeys(m);
|
|
checkSetApplyMutationsEndRange(m);
|
|
checkSetApplyMutationsKeyVersionMapRange(m);
|
|
checkSetLogRangesRange(m);
|
|
checkSetGlobalKeys(m);
|
|
checkSetWriteRecoverKey(m);
|
|
checkSetMinRequiredCommitVersionKey(m);
|
|
checkSetTenantMapPrefix(m);
|
|
checkSetOtherKeys(m);
|
|
} else if (m.type == MutationRef::ClearRange && isSystemKey(m.param2)) {
|
|
KeyRangeRef range(m.param1, m.param2);
|
|
|
|
checkClearKeyServerKeys(range);
|
|
checkClearConfigKeys(m, range);
|
|
checkClearServerListKeys(range);
|
|
checkClearTagLocalityListKeys(range);
|
|
checkClearServerTagKeys(m, range);
|
|
checkClearServerTagHistoryKeys(range);
|
|
checkClearApplyMutationsEndRange(m, range);
|
|
checkClearApplyMutationKeyVersionMapRange(m, range);
|
|
checkClearLogRangesRange(range);
|
|
checkClearTssMappingKeys(m, range);
|
|
checkClearTssQuarantineKeys(m, range);
|
|
checkClearTenantMapPrefix(range);
|
|
checkClearMiscRangeKeys(range);
|
|
}
|
|
}
|
|
|
|
for (KeyRangeRef& range : tssServerListToRemove) {
|
|
txnStateStore->clear(range);
|
|
}
|
|
|
|
for (auto& tssPair : tssMappingToAdd) {
|
|
// read tss server list from txn state store and add it to tss mapping
|
|
StorageServerInterface tssi =
|
|
decodeServerListValue(txnStateStore->readValue(serverListKeyFor(tssPair.second)).get().get());
|
|
(*tssMapping)[tssPair.first] = tssi;
|
|
}
|
|
|
|
tagStorageServersForCachedKeyRanges();
|
|
}
|
|
};
|
|
|
|
} // anonymous namespace
|
|
|
|
void applyMetadataMutations(SpanID const& spanContext,
|
|
ProxyCommitData& proxyCommitData,
|
|
Arena& arena,
|
|
Reference<ILogSystem> logSystem,
|
|
const VectorRef<MutationRef>& mutations,
|
|
LogPushData* toCommit,
|
|
bool& confChange,
|
|
Version version,
|
|
Version popVersion,
|
|
bool initialCommit) {
|
|
|
|
ApplyMetadataMutationsImpl(spanContext,
|
|
arena,
|
|
mutations,
|
|
proxyCommitData,
|
|
logSystem,
|
|
toCommit,
|
|
confChange,
|
|
version,
|
|
popVersion,
|
|
initialCommit)
|
|
.apply();
|
|
}
|
|
|
|
void applyMetadataMutations(SpanID const& spanContext,
|
|
const UID& dbgid,
|
|
Arena& arena,
|
|
const VectorRef<MutationRef>& mutations,
|
|
IKeyValueStore* txnStateStore) {
|
|
ApplyMetadataMutationsImpl(spanContext, dbgid, arena, mutations, txnStateStore).apply();
|
|
}
|