TSS Mapping Change

This commit is contained in:
Josh Slocum 2021-05-28 18:15:52 +00:00
parent 6f5ae9d76a
commit b3e4f182ef
22 changed files with 463 additions and 370 deletions

View File

@ -116,30 +116,18 @@ struct ClientDBInfo {
firstCommitProxy; // not serialized, used for commitOnFirstProxy when the commit proxies vector has been shrunk
Optional<Value> forward;
vector<VersionHistory> history;
vector<std::pair<UID, StorageServerInterface>>
tssMapping; // logically map<ssid, tss interface> for all active TSS pairs
ClientDBInfo() {}
bool operator==(ClientDBInfo const& r) const { return id == r.id; }
bool operator!=(ClientDBInfo const& r) const { return id != r.id; }
// convenience method to treat tss mapping like a map
Optional<StorageServerInterface> getTssPair(UID storageServerID) const {
for (auto& it : tssMapping) {
if (it.first == storageServerID) {
return Optional<StorageServerInterface>(it.second);
}
}
return Optional<StorageServerInterface>();
}
template <class Archive>
void serialize(Archive& ar) {
if constexpr (!is_fb_function<Archive>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, grvProxies, commitProxies, id, forward, history, tssMapping);
serializer(ar, grvProxies, commitProxies, id, forward, history);
}
};
@ -300,9 +288,12 @@ struct GetKeyServerLocationsReply {
Arena arena;
std::vector<std::pair<KeyRangeRef, vector<StorageServerInterface>>> results;
// if any storage servers in results have a TSS pair, that mapping is in here
std::vector<std::pair<UID, StorageServerInterface>> resultsTssMapping;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, results, arena);
serializer(ar, results, resultsTssMapping, arena);
}
};

View File

@ -323,7 +323,10 @@ public:
std::map<UID, StorageServerInfo*> server_interf;
std::map<UID, Reference<TSSMetrics>> tssMetrics;
// map from ssid -> tss interface
std::unordered_map<UID, StorageServerInterface> tssMapping;
// map from tssid -> metrics for that tss pair
std::unordered_map<UID, Reference<TSSMetrics>> tssMetrics;
UID dbId;
bool internal; // Only contexts created through the C client and fdbcli are non-internal
@ -425,8 +428,8 @@ public:
static const std::vector<std::string> debugTransactionTagChoices;
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
void maybeAddTssMapping(StorageServerInterface const& ssi);
void addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi);
void removeTssMapping(StorageServerInterface const& ssi);
};
#endif

View File

@ -122,38 +122,50 @@ NetworkOptions::NetworkOptions()
static const Key CLIENT_LATENCY_INFO_PREFIX = LiteralStringRef("client_latency/");
static const Key CLIENT_LATENCY_INFO_CTR_PREFIX = LiteralStringRef("client_latency_counter/");
void DatabaseContext::maybeAddTssMapping(StorageServerInterface const& ssi) {
// add tss mapping if server is new
void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi) {
auto result = tssMapping.find(ssi.id());
// Update tss endpoint mapping if ss isn't in mapping, or the interface it mapped to changed
if (result == tssMapping.end() ||
result->second.getValue.getEndpoint().token.first() != tssi.getValue.getEndpoint().token.first()) {
Reference<TSSMetrics> metrics;
if (result == tssMapping.end()) {
// new TSS pairing
metrics = makeReference<TSSMetrics>();
tssMetrics[tssi.id()] = metrics;
tssMapping[ssi.id()] = tssi;
} else {
if (result->second.id() == tssi.id()) {
metrics = tssMetrics[tssi.id()];
} else {
TEST(true); // SS now maps to new TSS! This will probably never happen in practice
tssMetrics.erase(result->second.id());
metrics = makeReference<TSSMetrics>();
tssMetrics[tssi.id()] = metrics;
}
result->second = tssi;
}
Optional<StorageServerInterface> tssPair = clientInfo->get().getTssPair(ssi.id());
if (tssPair.present()) {
addTssMapping(ssi, tssPair.get());
queueModel.updateTssEndpoint(ssi.getValue.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getValue.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getKey.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getKey.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getKeyValues.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getKeyValues.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.watchValue.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.watchValue.getEndpoint(), metrics));
}
}
// calling getInterface potentially recursively is weird, but since this function is only called when an entry is
// created/changed, the recursive call should never recurse itself.
void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi) {
Reference<StorageServerInfo> tssInfo = StorageServerInfo::getInterface(this, tssi, clientLocality);
Reference<StorageServerInfo> ssInfo = StorageServerInfo::getInterface(this, ssi, clientLocality);
Reference<TSSMetrics> metrics = makeReference<TSSMetrics>();
tssMetrics[tssi.id()] = metrics;
// Add each read data request we want to duplicate to TSS to endpoint mapping (getValue, getKey, getKeyValues,
// watchValue)
queueModel.updateTssEndpoint(
ssInfo->interf.getValue.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssInfo->interf.getValue.getEndpoint(), metrics, clientInfo->get().id));
queueModel.updateTssEndpoint(
ssInfo->interf.getKey.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssInfo->interf.getKey.getEndpoint(), metrics, clientInfo->get().id));
queueModel.updateTssEndpoint(
ssInfo->interf.getKeyValues.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssInfo->interf.getKeyValues.getEndpoint(), metrics, clientInfo->get().id));
queueModel.updateTssEndpoint(
ssInfo->interf.watchValue.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssInfo->interf.watchValue.getEndpoint(), metrics, clientInfo->get().id));
void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
auto result = tssMapping.find(ssi.id());
if (result != tssMapping.end()) {
tssMetrics.erase(ssi.id());
tssMapping.erase(result);
queueModel.removeTssEndpoint(ssi.getValue.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getKey.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getKeyValues.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.watchValue.getEndpoint().token.first());
}
}
Reference<StorageServerInfo> StorageServerInfo::getInterface(DatabaseContext* cx,
@ -170,12 +182,10 @@ Reference<StorageServerInfo> StorageServerInfo::getInterface(DatabaseContext* cx
// changes.
it->second->interf = ssi;
cx->maybeAddTssMapping(ssi);
} else {
it->second->notifyContextDestroyed();
Reference<StorageServerInfo> loc(new StorageServerInfo(cx, ssi, locality));
cx->server_interf[ssi.id()] = loc.getPtr();
cx->maybeAddTssMapping(ssi);
return loc;
}
}
@ -185,7 +195,6 @@ Reference<StorageServerInfo> StorageServerInfo::getInterface(DatabaseContext* cx
Reference<StorageServerInfo> loc(new StorageServerInfo(cx, ssi, locality));
cx->server_interf[ssi.id()] = loc.getPtr();
cx->maybeAddTssMapping(ssi);
return loc;
}
@ -813,45 +822,6 @@ ACTOR Future<Void> monitorCacheList(DatabaseContext* self) {
}
}
// updates tss mapping when set of tss servers changes
ACTOR static Future<Void> monitorTssChange(DatabaseContext* cx) {
state vector<std::pair<UID, StorageServerInterface>> curTssMapping;
curTssMapping = cx->clientInfo->get().tssMapping;
loop {
wait(cx->clientInfo->onChange());
if (cx->clientInfo->get().tssMapping != curTssMapping) {
// To optimize size of the ClientDBInfo payload, we could eventually change CC to just send a tss change
// id/generation, and have client reread the mapping here if it changed. It's a very minor optimization
// though, and would cause extra read load.
ClientDBInfo clientInfo = cx->clientInfo->get();
curTssMapping = clientInfo.tssMapping;
std::unordered_set<UID> seenTssIds;
if (curTssMapping.size()) {
for (const auto& it : curTssMapping) {
seenTssIds.insert(it.second.id());
if (cx->server_interf.count(it.first)) {
cx->addTssMapping(cx->server_interf[it.first]->interf, it.second);
}
}
}
for (auto it = cx->tssMetrics.begin(); it != cx->tssMetrics.end();) {
if (seenTssIds.count(it->first)) {
it++;
} else {
it = cx->tssMetrics.erase(it);
}
}
cx->queueModel.removeOldTssData(clientInfo.id);
}
}
}
ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
state Reference<ReadYourWritesTransaction> tr;
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
@ -860,7 +830,7 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
// find ss pair id so we can remove it from the mapping
state UID tssPairID;
bool found = false;
for (const auto& it : cx->clientInfo->get().tssMapping) {
for (const auto& it : cx->tssMapping) {
if (it.second.id() == tssID) {
tssPairID = it.first;
found = true;
@ -870,7 +840,7 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
if (found) {
TraceEvent(SevWarnAlways, "TSS_KillMismatch").detail("TSSID", tssID.toString());
TEST(true); // killing TSS because it got mismatch
// TODO we could write something to the system keyspace and then have DD listen to that keyspace and then DD
// do exactly this, so why not just cut out the middle man (or the middle system keys, as it were)
tr = makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(cx)));
@ -883,7 +853,6 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
tr->clear(serverTagKeyFor(tssID));
tssMapDB.erase(tr, tssPairID);
tr->set(tssMappingChangeKey, deterministicRandom()->randomUniqueID().toString());
wait(tr->commit());
break;
@ -1155,7 +1124,6 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
GlobalConfig::create(this, clientInfo);
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
monitorTssInfoChange = monitorTssChange(this);
tssMismatchHandler = handleTssMismatches(this);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
cacheListMonitor = monitorCacheList(this);
@ -2216,6 +2184,29 @@ ACTOR Future<Optional<vector<StorageServerInterface>>> transactionalGetServerInt
return serverInterfaces;
}
void updateTssMappings(Database cx, const GetKeyServerLocationsReply& reply) {
// Since a ss -> tss mapping is included in resultsTssMapping iff that SS is in results and has a tss pair,
// all SS in results that do not have a mapping present must not have a tss pair.
std::unordered_map<UID, const StorageServerInterface*> ssiById;
for (const auto& [_, shard] : reply.results) {
for (auto& ssi : shard) {
ssiById[ssi.id()] = &ssi;
}
}
for (const auto& mapping : reply.resultsTssMapping) {
auto ssi = ssiById.find(mapping.first);
ASSERT(ssi != ssiById.end());
cx->addTssMapping(*ssi->second, mapping.second);
ssiById.erase(mapping.first);
}
// if SS didn't have a mapping above, it's still in the ssiById map, so remove its tss mapping
for (const auto& it : ssiById) {
cx->removeTssMapping(*it.second);
}
}
// If isBackward == true, returns the shard containing the key before 'key' (an infinitely long, inexpressible key).
// Otherwise returns the shard containing key
ACTOR Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation_internal(Database cx,
@ -2248,6 +2239,7 @@ ACTOR Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation_internal(Da
ASSERT(rep.results.size() == 1);
auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
updateTssMappings(cx, rep);
return std::make_pair(KeyRange(rep.results[0].first, rep.arena), locationInfo);
}
}
@ -2311,6 +2303,7 @@ ACTOR Future<vector<pair<KeyRange, Reference<LocationInfo>>>> getKeyRangeLocatio
cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second));
wait(yield());
}
updateTssMappings(cx, rep);
return results;
}

View File

@ -346,7 +346,6 @@ uint16_t cacheChangeKeyDecodeIndex(const KeyRef& key) {
return idx;
}
const KeyRef tssMappingChangeKey = LiteralStringRef("\xff\x02/tssMappingChangeKey");
const KeyRangeRef tssMappingKeys(LiteralStringRef("\xff/tss/"), LiteralStringRef("\xff/tss0"));
const KeyRangeRef serverTagKeys(LiteralStringRef("\xff/serverTag/"), LiteralStringRef("\xff/serverTag0"));

View File

@ -116,9 +116,7 @@ const Key cacheChangeKeyFor(uint16_t idx);
uint16_t cacheChangeKeyDecodeIndex(const KeyRef& key);
// "\xff/tss/[[serverId]]" := "[[tssId]]"
extern const KeyRef tssMappingChangeKey;
extern const KeyRangeRef tssMappingKeys;
extern const KeyRef tssMappingPrefix;
// "\xff/serverTag/[[serverID]]" = "[[Tag]]"
// Provides the Tag for the given serverID. Used to access a

View File

@ -144,6 +144,7 @@ Future<Void> tssComparison(Req req,
: SevError;
if (!TSS_doCompare(req, src.get(), tss.get().get(), traceSeverity, tssData.tssId)) {
TEST(true); // TSS Mismatch
++tssData.metrics->mismatches;
}
} else if (tssLB.present() && tssLB.get().error.present()) {
@ -192,6 +193,7 @@ struct RequestData : NonCopyable {
Optional<TSSEndpointData> tssData = model->getTssData(stream->getEndpoint().token.first());
if (tssData.present()) {
TEST(true); // duplicating request to TSS
resetReply(request);
// FIXME: optimize to avoid creating new netNotifiedQueue for each message
RequestStream<Request> tssRequestStream(tssData.get().endpoint);

View File

@ -62,24 +62,12 @@ double QueueModel::addRequest(uint64_t id) {
void QueueModel::updateTssEndpoint(uint64_t endpointId, const TSSEndpointData& tssData) {
auto& d = data[endpointId];
if (!d.tssData.present()) {
tssCount++;
d.tssData = Optional<TSSEndpointData>(tssData);
} else {
d.tssData.get().generation = tssData.generation;
}
d.tssData = tssData;
}
void QueueModel::removeOldTssData(UID currentGeneration) {
if (tssCount > 0) {
// expire old tss mappings that aren't present in new mapping
for (auto& it : data) {
if (it.second.tssData.present() && it.second.tssData.get().generation != currentGeneration) {
it.second.tssData = Optional<TSSEndpointData>();
tssCount--;
}
}
}
void QueueModel::removeTssEndpoint(uint64_t endpointId) {
auto& d = data[endpointId];
d.tssData = Optional<TSSEndpointData>();
}
Optional<TSSEndpointData> QueueModel::getTssData(uint64_t id) {

View File

@ -33,10 +33,9 @@ struct TSSEndpointData {
UID tssId;
Endpoint endpoint;
Reference<TSSMetrics> metrics;
UID generation;
TSSEndpointData(UID tssId, Endpoint endpoint, Reference<TSSMetrics> metrics, UID generation)
: tssId(tssId), endpoint(endpoint), metrics(metrics), generation(generation) {}
TSSEndpointData(UID tssId, Endpoint endpoint, Reference<TSSMetrics> metrics)
: tssId(tssId), endpoint(endpoint), metrics(metrics) {}
};
// The data structure used for the client-side load balancing algorithm to
@ -111,10 +110,10 @@ public:
int laggingTSSCompareCount;
void updateTssEndpoint(uint64_t endpointId, const TSSEndpointData& endpointData);
void removeOldTssData(UID currentGeneration);
void removeTssEndpoint(uint64_t endpointId);
Optional<TSSEndpointData> getTssData(uint64_t endpointId);
QueueModel() : secondMultiplier(1.0), secondBudget(0), laggingRequestCount(0), tssCount(0) {
QueueModel() : secondMultiplier(1.0), secondBudget(0), laggingRequestCount(0) {
laggingRequests = actorCollection(addActor.getFuture(), &laggingRequestCount);
tssComparisons = actorCollection(addTSSActor.getFuture(), &laggingTSSCompareCount);
}
@ -126,7 +125,6 @@ public:
private:
std::unordered_map<uint64_t, QueueData> data;
uint32_t tssCount;
};
/* old queue model

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/MutationList.h"
#include "fdbclient/KeyBackedTypes.h" // for key backed map codecs for tss mapping
#include "fdbclient/SystemData.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/Notified.h"
@ -64,6 +65,7 @@ void applyMetadataMutations(SpanID const& spanContext,
NotifiedVersion* commitVersion,
std::map<UID, Reference<StorageInfo>>* storageCache,
std::map<Tag, Version>* tag_popped,
std::unordered_map<UID, StorageServerInterface>* tssMapping,
bool initialCommit) {
// std::map<keyRef, vector<uint16_t>> cacheRangeInfo;
std::map<KeyRef, MutationRef> cachedRangeInfo;
@ -72,7 +74,9 @@ void applyMetadataMutations(SpanID const& spanContext,
// tss + find partner's tag to send the private mutation. Since the removeStorageServer transaction clears both the
// storage list and server tag, we have to enforce ordering, proccessing the server tag first, and postpone the
// server list clear until the end;
// Similarly, the TSS mapping change key needs to read the server list at the end of the commit
std::vector<KeyRangeRef> tssServerListToRemove;
std::vector<std::pair<UID, UID>> tssMappingToAdd;
for (auto const& m : mutations) {
//TraceEvent("MetadataMutation", dbgid).detail("M", m.toString());
@ -240,6 +244,29 @@ void applyMetadataMutations(SpanID const& spanContext,
}
}
}
} else if (m.param1.startsWith(tssMappingKeys.begin)) {
if (!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
if (tssMapping) {
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
tssMappingToAdd.push_back(std::pair(ssId, tssId));
// send private mutation to SS that it now has a TSS pair
if (toCommit) {
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
if (tagV.present()) {
toCommit->addTag(decodeServerTagValue(tagV.get()));
toCommit->writeTypedMessage(privatized);
}
}
}
}
} else if (m.param1 == databaseLockedKey || m.param1 == metadataVersionKey ||
m.param1 == mustContainSystemMutationsKey ||
m.param1.startsWith(applyMutationsBeginRange.begin) ||
@ -430,7 +457,7 @@ void applyMetadataMutations(SpanID const& spanContext,
}
// Might be a tss removal, which doesn't store a tag there.
// Chained if is a little verbose, but avoids unecessary work
if (!initialCommit && !serverKeysCleared.size()) {
if (toCommit && !initialCommit && !serverKeysCleared.size()) {
KeyRangeRef maybeTssRange = range & serverTagKeys;
if (maybeTssRange.singleKeyRange()) {
UID id = decodeServerTagKey(maybeTssRange.begin);
@ -482,6 +509,19 @@ void applyMetadataMutations(SpanID const& spanContext,
if (!initialCommit)
txnStateStore->clear(range & serverTagHistoryKeys);
}
if (tssMappingKeys.intersects(range)) {
if (!initialCommit) {
KeyRangeRef rangeToClear = range & tssMappingKeys;
ASSERT(rangeToClear.singleKeyRange());
txnStateStore->clear(rangeToClear);
if (tssMapping) {
// Normally uses key backed map, so have to use same unpacking code here.
UID ssId =
Codec<UID>::unpack(Tuple::unpack(rangeToClear.begin.removePrefix(tssMappingKeys.begin)));
tssMapping->erase(ssId);
}
}
}
if (range.contains(coordinatorsKey)) {
if (!initialCommit)
txnStateStore->clear(singleKeyRange(coordinatorsKey));
@ -615,6 +655,13 @@ void applyMetadataMutations(SpanID const& spanContext,
txnStateStore->clear(range);
}
for (auto& tssPair : tssMappingToAdd) {
// read tss server list from txn state store and add it to tss mapping
StorageServerInterface tssi =
decodeServerListValue(txnStateStore->readValue(serverListKeyFor(tssPair.second)).get().get());
(*tssMapping)[tssPair.first] = tssi;
}
// If we accumulated private mutations for cached key-ranges, we also need to
// tag them with the relevant storage servers. This is done to make the storage
// servers aware of the cached key-ranges
@ -713,6 +760,7 @@ void applyMetadataMutations(SpanID const& spanContext,
&proxyCommitData.committedVersion,
&proxyCommitData.storageCache,
&proxyCommitData.tag_popped,
&proxyCommitData.tssMapping,
initialCommit);
}
@ -742,5 +790,6 @@ void applyMetadataMutations(SpanID const& spanContext,
/* commitVersion= */ nullptr,
/* storageCache= */ nullptr,
/* tag_popped= */ nullptr,
/* tssMapping= */ nullptr,
/* initialCommit= */ false);
}

View File

@ -103,6 +103,8 @@ set(FDBSERVER_SRCS
TesterInterface.actor.h
TLogInterface.h
TLogServer.actor.cpp
TSSMappingUtil.h
TSSMappingUtil.actor.cpp
VersionedBTree.actor.cpp
VFSAsync.h
VFSAsync.cpp

View File

@ -3387,7 +3387,6 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
clientInfo.id = deterministicRandom()->randomUniqueID();
clientInfo.commitProxies = req.commitProxies;
clientInfo.grvProxies = req.grvProxies;
clientInfo.tssMapping = db->clientInfo->get().tssMapping;
db->clientInfo->set(clientInfo);
dbInfo.client = db->clientInfo->get();
}
@ -3863,118 +3862,6 @@ ACTOR Future<Void> monitorServerInfoConfig(ClusterControllerData::DBInfo* db) {
}
}
// Monitors the tss mapping change key for changes,
// and broadcasts the new tss mapping to the rest of the cluster in ClientDBInfo.
ACTOR Future<Void> monitorTSSMapping(ClusterControllerData* self) {
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
loop {
state Reference<ReadYourWritesTransaction> tr =
Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(self->db.db));
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
std::vector<std::pair<UID, UID>> tssResults =
wait(tssMapDB.getRange(tr, UID(), Optional<UID>(), CLIENT_KNOBS->TOO_MANY));
ASSERT(tssResults.size() < CLIENT_KNOBS->TOO_MANY);
state std::unordered_map<UID, UID> tssIdMap;
std::set<UID> seenTssIds;
for (auto& it : tssResults) {
tssIdMap[it.first] = it.second;
// ensure two storage servers don't map to same TSS
ASSERT(seenTssIds.insert(it.second).second);
// ensure a storage server doesn't accidentally map to itself (unless we're in HACK_IDENTITY_MAPPING
// mode)
ASSERT(SERVER_KNOBS->TSS_HACK_IDENTITY_MAPPING || it.first != it.second);
}
state std::vector<std::pair<UID, StorageServerInterface>> newMapping;
state std::map<UID, StorageServerInterface> oldMapping;
state bool mappingChanged = false;
state ClientDBInfo clientInfo = self->db.clientInfo->get();
for (auto& it : clientInfo.tssMapping) {
oldMapping[it.first] = it.second;
if (!tssIdMap.count(it.first)) {
TraceEvent("TSS_MappingRemoved", self->id)
.detail("SSID", it.first)
.detail("TSSID", it.second.id());
mappingChanged = true;
}
}
for (auto& it : tssIdMap) {
bool ssAlreadyPaired = oldMapping.count(it.first);
state Optional<UID> oldTssId;
state Optional<UID> oldGetValueEndpoint;
if (ssAlreadyPaired) {
auto interf = oldMapping[it.first];
// check if this SS maps to a new TSS
oldTssId = Optional<UID>(interf.id());
oldGetValueEndpoint = Optional<UID>(interf.getValue.getEndpoint().token);
if (interf.id() != it.second) {
TraceEvent("TSS_MappingChanged", self->id)
.detail("SSID", it.first)
.detail("TSSID", it.second)
.detail("OldTSSID", interf.id());
mappingChanged = true;
}
} else {
TraceEvent("TSS_MappingAdded", self->id).detail("SSID", it.first).detail("TSSID", it.second);
mappingChanged = true;
}
state UID ssid = it.first;
state UID tssid = it.second;
// request storage server interface for tssid, add it to results
Optional<Value> tssiVal = wait(tr->get(serverListKeyFor(it.second)));
// because we read the tss mapping in the same transaction, there can be no races with tss removal
// and the tss interface must exist
ASSERT(tssiVal.present());
StorageServerInterface tssi = decodeServerListValue(tssiVal.get());
if (oldTssId.present() && tssi.id() == oldTssId.get() && oldGetValueEndpoint.present() &&
oldGetValueEndpoint.get() != tssi.getValue.getEndpoint().token) {
mappingChanged = true;
}
newMapping.push_back(std::pair<UID, StorageServerInterface>(ssid, tssi));
}
// if nothing changed, skip updating
if (mappingChanged) {
clientInfo.id = deterministicRandom()->randomUniqueID();
clientInfo.tssMapping = newMapping;
self->db.clientInfo->set(clientInfo);
ServerDBInfo serverInfo = self->db.serverInfo->get();
// also change server db info so workers get new mapping
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.infoGeneration = ++self->db.dbInfoCount;
serverInfo.client = clientInfo;
self->db.serverInfo->set(serverInfo);
}
state Future<Void> tssChangeFuture = tr->watch(tssMappingChangeKey);
wait(tr->commit());
wait(tssChangeFuture);
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
}
// Monitors the global configuration version key for changes. When changes are
// made, the global configuration history is read and any updates are sent to
// all processes in the system by updating the ClientDBInfo object. The
@ -4525,7 +4412,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(handleForcedRecoveries(&self, interf));
self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self));
self.addActor.send(monitorTSSMapping(&self));
// self.addActor.send(monitorTSSMapping(&self));
self.addActor.send(dbInfoUpdater(&self));
self.addActor.send(traceCounters("ClusterControllerMetrics",
self.id,

View File

@ -1275,7 +1275,8 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
// self->committedVersion by reporting commit version first before updating self->committedVersion. Otherwise, a
// client may get a commit version that the master is not aware of, and next GRV request may get a version less than
// self->committedVersion.
TEST(pProxyCommitData->committedVersion.get() > self->commitVersion); // A later version was reported committed first
TEST(pProxyCommitData->committedVersion.get() >
self->commitVersion); // A later version was reported committed first
if (self->commitVersion >= pProxyCommitData->committedVersion.get()) {
wait(pProxyCommitData->master.reportLiveCommittedVersion.getReply(
ReportRawCommittedVersionRequest(self->commitVersion,
@ -1430,11 +1431,25 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self,
return Void();
}
void maybeAddTssMapping(GetKeyServerLocationsReply& reply,
ProxyCommitData* commitData,
std::unordered_set<UID>& included,
UID ssId) {
if (!included.count(ssId)) {
auto mappingItr = commitData->tssMapping.find(ssId);
if (mappingItr != commitData->tssMapping.end()) {
included.insert(ssId);
reply.resultsTssMapping.push_back(*mappingItr);
}
}
}
ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) {
// We can't respond to these requests until we have valid txnStateStore
wait(commitData->validState.getFuture());
wait(delay(0, TaskPriority::DefaultEndpoint));
std::unordered_set<UID> tssMappingsIncluded;
GetKeyServerLocationsReply rep;
if (!req.end.present()) {
auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin)
@ -1443,6 +1458,7 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
ssis.reserve(r.value().src_info.size());
for (auto& it : r.value().src_info) {
ssis.push_back(it->interf);
maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id());
}
rep.results.push_back(std::make_pair(r.range(), ssis));
} else if (!req.reverse) {
@ -1454,6 +1470,7 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
ssis.reserve(r.value().src_info.size());
for (auto& it : r.value().src_info) {
ssis.push_back(it->interf);
maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id());
}
rep.results.push_back(std::make_pair(r.range(), ssis));
count++;
@ -1466,6 +1483,7 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
ssis.reserve(r.value().src_info.size());
for (auto& it : r.value().src_info) {
ssis.push_back(it->interf);
maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id());
}
rep.results.push_back(std::make_pair(r.range(), ssis));
if (r == commitData->keyInfo.ranges().begin()) {

View File

@ -628,7 +628,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
PromiseStream<UID> removedServers;
PromiseStream<UID> removedTSS;
std::set<UID> recruitingIds; // The IDs of the SS which are being recruited
std::set<UID> recruitingIds; // The IDs of the SS/TSS which are being recruited
std::set<NetworkAddress> recruitingLocalities;
Future<Void> initialFailureReactionDelay;
Future<Void> initializationDoneActor;
@ -4545,6 +4545,7 @@ struct TSSPairState : ReferenceCounted<TSSPairState>, NonCopyable {
Promise<Optional<std::pair<UID, Version>>>
ssPairInfo; // if set, for ss to pass its id to tss pair once it is successfully recruited
Promise<bool> tssPairDone; // if set, for tss to pass ss that it was successfully recruited
Promise<Void> complete;
Optional<Key> dcId; // dc
Optional<Key> dataHallId; // data hall
@ -4569,6 +4570,9 @@ struct TSSPairState : ReferenceCounted<TSSPairState>, NonCopyable {
if (tssPairDone.canBeSet()) {
tssPairDone.send(false);
}
if (complete.canBeSet()) {
complete.send(Void());
}
}
}
@ -4604,9 +4608,19 @@ struct TSSPairState : ReferenceCounted<TSSPairState>, NonCopyable {
return false;
}
bool markComplete() {
if (active && complete.canBeSet()) {
complete.send(Void());
return true;
}
return false;
}
Future<Optional<std::pair<UID, Version>>> waitOnSS() { return ssPairInfo.getFuture(); }
Future<bool> waitOnTSS() { return tssPairDone.getFuture(); }
Future<Void> waitComplete() { return complete.getFuture(); }
};
ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
@ -4742,6 +4756,8 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
self->serverTrackerErrorOut,
newServer.get().addedVersion,
ddEnabledState);
// signal all done after adding tss to tracking info
tssState->markComplete();
}
} else {
TraceEvent(SevWarn, "DDRecruitmentError")
@ -4756,6 +4772,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
// SS and/or TSS recruitment failed at this point, update tssState
if (recruitTss && tssState->tssRecruitFailed()) {
tssState->markComplete();
TEST(true); // TSS recruitment failed for some reason
}
if (!recruitTss && tssState->ssRecruitFailed()) {
@ -4777,15 +4794,42 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
state std::map<AddressExclusion, int> numSSPerAddr;
// tss-specific recruitment state
state int32_t tssToRecruit = self->configuration.desiredTSSCount - db->get().client.tssMapping.size();
state int32_t targetTSSInDC = 0;
state int32_t tssToRecruit = 0;
state int inProgressTSSCount = 0;
state PromiseStream<Future<Void>> addTSSInProgress;
state Future<Void> inProgressTSS =
actorCollection(addTSSInProgress.getFuture(), &inProgressTSSCount, nullptr, nullptr, nullptr);
state Reference<TSSPairState> tssState = makeReference<TSSPairState>();
state Future<Void> checkKillTss = self->initialFailureReactionDelay;
state bool sleepingAfterKillTss = false;
state Future<Void> checkTss = self->initialFailureReactionDelay;
state bool pendingTSSCheck = false;
TraceEvent(SevDebug, "TSS_RecruitUpdated", self->distributorId).detail("Count", tssToRecruit);
loop {
try {
// Divide TSS evenly in each DC if there are multiple
// TODO would it be better to put all of them in primary DC?
targetTSSInDC = self->configuration.desiredTSSCount;
if (self->configuration.usableRegions > 1) {
targetTSSInDC /= self->configuration.usableRegions;
if (self->primary) {
// put extras in primary DC if it's uneven
targetTSSInDC += (self->configuration.desiredTSSCount % self->configuration.usableRegions);
}
}
int newTssToRecruit = targetTSSInDC - self->tss_info_by_pair.size() - inProgressTSSCount;
if (newTssToRecruit != tssToRecruit) {
TraceEvent("TSS_RecruitUpdated", self->distributorId).detail("Count", newTssToRecruit);
tssToRecruit = newTssToRecruit;
// if we need to get rid of some TSS processes, signal to either cancel recruitment or kill existing TSS
// processes
if (!pendingTSSCheck && (tssToRecruit < 0 || self->zeroHealthyTeams->get()) &&
(self->isTssRecruiting || (self->zeroHealthyTeams->get() && self->tss_info_by_pair.size() > 0))) {
checkTss = self->initialFailureReactionDelay;
}
}
numSSPerAddr.clear();
hasHealthyTeam = (self->healthyTeamCount != 0);
RecruitStorageRequest rsr;
@ -4870,7 +4914,9 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
self->isTssRecruiting = true;
tssState = makeReference<TSSPairState>(candidateWorker.worker.locality);
addTSSInProgress.send(tssState->waitComplete());
self->addActor.send(initializeStorage(self, candidateWorker, ddEnabledState, true, tssState));
checkTss = self->initialFailureReactionDelay;
} else {
if (tssState->active && tssState->inDataZone(candidateWorker.worker.locality)) {
TEST(true); // TSS recruits pair in same dc/datahall
@ -4883,7 +4929,6 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
initializeStorage(self, candidateWorker, ddEnabledState, false, tssState));
// successfully started recruitment of pair, reset tss recruitment state
tssState = makeReference<TSSPairState>();
tssToRecruit--;
} else {
TEST(tssState->active); // TSS recruitment skipped potential pair because it's in a
// different dc/datahall
@ -4892,72 +4937,64 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
}
}
}
when(wait(db->onChange())) { // SOMEDAY: only if clusterInterface or tss changes?
when(wait(db->onChange())) { // SOMEDAY: only if clusterInterface changes?
fCandidateWorker = Future<RecruitStorageReply>();
int newTssToRecruit = self->configuration.desiredTSSCount - db->get().client.tssMapping.size();
if (newTssToRecruit != tssToRecruit) {
TraceEvent("TSS_RecruitUpdated", self->distributorId).detail("Count", newTssToRecruit);
tssToRecruit = newTssToRecruit;
}
if (self->isTssRecruiting && (tssToRecruit <= 0 || self->zeroHealthyTeams->get())) {
TEST(tssToRecruit <= 0); // tss recruitment cancelled due to too many TSS
TEST(self->zeroHealthyTeams->get()); // tss recruitment cancelled due zero healthy teams
TraceEvent(SevWarn, "TSS_RecruitCancelled", self->distributorId)
.detail("Reason", tssToRecruit <= 0 ? "ConfigChange" : "ZeroHealthyTeams");
tssState->cancel();
tssState = makeReference<TSSPairState>();
self->isTssRecruiting = false;
} else if (!self->isTssRecruiting &&
(tssToRecruit < 0 ||
(self->zeroHealthyTeams->get() && db->get().client.tssMapping.size() > 0))) {
if (!sleepingAfterKillTss) {
checkKillTss = self->initialFailureReactionDelay;
}
}
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (self->isTssRecruiting && self->zeroHealthyTeams->get()) {
TEST(self->zeroHealthyTeams->get()); // tss recruitment cancelled due zero healthy teams 2
if (!pendingTSSCheck && self->zeroHealthyTeams->get() &&
(self->isTssRecruiting || self->tss_info_by_pair.size() > 0)) {
checkTss = self->initialFailureReactionDelay;
}
}
when(wait(checkTss)) {
bool cancelTss = self->isTssRecruiting && (tssToRecruit < 0 || self->zeroHealthyTeams->get());
// Can't kill more tss' than we have. Kill 1 if zero healthy teams, otherwise kill enough to get
// back to the desired amount
int tssToKill = std::min((int)self->tss_info_by_pair.size(),
std::max(-tssToRecruit, self->zeroHealthyTeams->get() ? 1 : 0));
if (cancelTss) {
TEST(tssToRecruit < 0); // tss recruitment cancelled due to too many TSS
TEST(self->zeroHealthyTeams->get()); // tss recruitment cancelled due zero healthy teams
TraceEvent(SevWarn, "TSS_RecruitCancelled", self->distributorId)
.detail("Reason", "ZeroHealthyTeams");
.detail("Reason", tssToRecruit <= 0 ? "TooMany" : "ZeroHealthyTeams");
tssState->cancel();
tssState = makeReference<TSSPairState>();
self->isTssRecruiting = false;
} else if (!self->isTssRecruiting && self->zeroHealthyTeams->get() &&
db->get().client.tssMapping.size() > 0) {
if (!sleepingAfterKillTss) {
checkKillTss = self->initialFailureReactionDelay;
}
}
}
when(wait(checkKillTss)) {
int tssToKill = std::min((int)db->get().client.tssMapping.size(),
std::max(-tssToRecruit, self->zeroHealthyTeams->get() ? 1 : 0));
if (tssToKill > 0) {
for (int i = 0; i < tssToKill; i++) {
StorageServerInterface tssi = db->get().client.tssMapping[i].second;
if (self->shouldHandleServer(tssi) && self->server_and_tss_info.count(tssi.id())) {
TraceEvent(SevWarn, "TSS_DDKill", self->distributorId)
.detail("TSSID", tssi.id())
.detail("Reason",
self->zeroHealthyTeams->get() ? "ZeroHealthyTeams" : "ConfigChange");
pendingTSSCheck = true;
checkTss = delay(SERVER_KNOBS->TSS_DD_CHECK_INTERVAL);
} else if (tssToKill > 0) {
auto itr = self->tss_info_by_pair.begin();
for (int i = 0; i < tssToKill; i++, itr++) {
UID tssId = itr->second->id;
StorageServerInterface tssi = itr->second->lastKnownInterface;
Promise<Void> killPromise = self->server_and_tss_info[tssi.id()]->killTss;
if (self->shouldHandleServer(tssi) && self->server_and_tss_info.count(tssId)) {
Promise<Void> killPromise = itr->second->killTss;
if (killPromise.canBeSet()) {
TEST(tssToRecruit < 0); // Killing TSS due to too many TSS
TEST(self->zeroHealthyTeams->get()); // Killing TSS due zero healthy teams
TraceEvent(SevWarn, "TSS_DDKill", self->distributorId)
.detail("TSSID", tssId)
.detail("Reason",
self->zeroHealthyTeams->get() ? "ZeroHealthyTeams" : "TooMany");
killPromise.send(Void());
}
}
}
// If we're killing a TSS because of zero healthy teams, wait a bit to give the replacing SS a
// change to join teams and stuff before killing another TSS
sleepingAfterKillTss = true;
checkKillTss = delay(SERVER_KNOBS->TSS_DD_KILL_INTERVAL);
pendingTSSCheck = true;
checkTss = delay(SERVER_KNOBS->TSS_DD_CHECK_INTERVAL);
} else if (self->isTssRecruiting) {
// check again later in case we need to cancel recruitment
pendingTSSCheck = true;
checkTss = delay(SERVER_KNOBS->TSS_DD_CHECK_INTERVAL);
// FIXME: better way to do this than timer?
} else {
sleepingAfterKillTss = false;
checkKillTss = Never();
pendingTSSCheck = false;
checkTss = Never();
}
}
when(wait(self->restartRecruiting.onTrigger())) {}
@ -5622,6 +5659,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
if (err.code() != error_code_movekeys_conflict) {
throw err;
}
bool ddEnabled = wait(isDataDistributionEnabled(cx, ddEnabledState));
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err);
if (ddEnabled) {

View File

@ -219,7 +219,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( STORAGE_RECRUITMENT_DELAY, 10.0 );
init( TSS_HACK_IDENTITY_MAPPING, false ); // THIS SHOULD NEVER BE SET IN PROD. Only for performance testing
init( TSS_RECRUITMENT_TIMEOUT, 3*STORAGE_RECRUITMENT_DELAY ); if (randomize && BUGGIFY ) TSS_RECRUITMENT_TIMEOUT = 1.0; // Super low timeout should cause tss recruitments to fail
init( TSS_DD_KILL_INTERVAL, 60.0 ); if (randomize && BUGGIFY ) TSS_DD_KILL_INTERVAL = 1.0; // May kill all TSS quickly
init( TSS_DD_CHECK_INTERVAL, 60.0 ); if (randomize && BUGGIFY ) TSS_DD_CHECK_INTERVAL = 1.0; // May kill all TSS quickly
init( DATA_DISTRIBUTION_LOGGING_INTERVAL, 5.0 );
init( DD_ENABLED_CHECK_DELAY, 1.0 );
init( DD_STALL_CHECK_DELAY, 0.4 ); //Must be larger than 2*MAX_BUGGIFIED_DELAY

View File

@ -169,7 +169,7 @@ public:
double STORAGE_RECRUITMENT_DELAY;
bool TSS_HACK_IDENTITY_MAPPING;
double TSS_RECRUITMENT_TIMEOUT;
double TSS_DD_KILL_INTERVAL;
double TSS_DD_CHECK_INTERVAL;
double DATA_DISTRIBUTION_LOGGING_INTERVAL;
double DD_ENABLED_CHECK_DELAY;
double DD_STALL_CHECK_DELAY;

View File

@ -20,11 +20,11 @@
#include "flow/Util.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/DatabaseContext.h" // for tss mapping
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/TSSMappingUtil.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using std::max;
@ -322,6 +322,7 @@ ACTOR static Future<Void> startMoveKeys(Database occ,
MoveKeysLock lock,
FlowLock* startMoveKeysLock,
UID relocationIntervalId,
std::map<UID, StorageServerInterface>* tssMapping,
const DDEnabledState* ddEnabledState) {
state TraceInterval interval("RelocateShard_StartMoveKeys");
state Future<Void> warningLogger = logWarningAfter("StartMoveKeysTooLong", 600, servers);
@ -329,6 +330,7 @@ ACTOR static Future<Void> startMoveKeys(Database occ,
wait(startMoveKeysLock->take(TaskPriority::DataDistributionLaunch));
state FlowLock::Releaser releaser(*startMoveKeysLock);
state bool loadedTssMapping = false;
TraceEvent(SevDebug, interval.begin(), relocationIntervalId);
@ -365,6 +367,12 @@ ACTOR static Future<Void> startMoveKeys(Database occ,
wait(checkMoveKeysLock(&(tr->getTransaction()), lock, ddEnabledState));
if (!loadedTssMapping) {
// share transaction for loading tss mapping with the rest of start move keys
wait(readTSSMappingRYW(tr, tssMapping));
loadedTssMapping = true;
}
vector<Future<Optional<Value>>> serverListEntries;
serverListEntries.reserve(servers.size());
for (int s = 0; s < servers.size(); s++)
@ -547,7 +555,8 @@ ACTOR Future<Void> checkFetchingState(Database cx,
vector<UID> dest,
KeyRange keys,
Promise<Void> dataMovementComplete,
UID relocationIntervalId) {
UID relocationIntervalId,
std::map<UID, StorageServerInterface> tssMapping) {
state Transaction tr(cx);
loop {
@ -565,7 +574,6 @@ ACTOR Future<Void> checkFetchingState(Database cx,
state vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
vector<Future<Void>> requests;
state vector<Future<Void>> tssRequests;
ClientDBInfo clientInfo = cx->clientInfo->get();
for (int s = 0; s < serverListValues.size(); s++) {
if (!serverListValues[s].present()) {
// FIXME: Is this the right behavior? dataMovementComplete will never be sent!
@ -577,10 +585,10 @@ ACTOR Future<Void> checkFetchingState(Database cx,
requests.push_back(
waitForShardReady(si, keys, tr.getReadVersion().get(), GetShardStateRequest::FETCHING));
Optional<StorageServerInterface> tssPair = clientInfo.getTssPair(si.id());
if (tssPair.present()) {
auto tssPair = tssMapping.find(si.id());
if (tssPair != tssMapping.end()) {
tssRequests.push_back(waitForShardReady(
tssPair.get(), keys, tr.getReadVersion().get(), GetShardStateRequest::FETCHING));
tssPair->second, keys, tr.getReadVersion().get(), GetShardStateRequest::FETCHING));
}
}
@ -617,6 +625,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
FlowLock* finishMoveKeysParallelismLock,
bool hasRemote,
UID relocationIntervalId,
std::map<UID, StorageServerInterface> tssMapping,
const DDEnabledState* ddEnabledState) {
state TraceInterval interval("RelocateShard_FinishMoveKeys");
state TraceInterval waitInterval("");
@ -626,9 +635,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
state int retries = 0;
state FlowLock::Releaser releaser;
// for killing tss if any get stuck during movekeys
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
state std::vector<StorageServerInterface> tssToKill;
state std::vector<std::pair<UID, UID>> tssToKill;
state std::unordered_set<UID> tssToIgnore;
// try waiting for tss for a 2 loops, give up if they're stuck to not affect the rest of the cluster
state int waitForTSSCounter = 2;
@ -658,33 +665,13 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
// (and don't want to add bugs) by changing whole method to RYW. Also, using a different
// transaction makes it commit earlier which we may need to guarantee causality of tss getting
// removed before client sends a request to this key range on the new SS
state Reference<ReadYourWritesTransaction> tssTr =
makeReference<ReadYourWritesTransaction>(occ);
loop {
try {
tssTr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tssTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
for (auto& tss : tssToKill) {
// DO NOT remove server list key - that'll break a bunch of stuff. DD will
// eventually call removeStorageServer
wait(removeTSSPairsFromCluster(occ, tssToKill));
tssTr->clear(serverTagKeyFor(tss.id()));
tssMapDB.erase(tssTr, tss.tssPairID.get());
}
tssTr->set(tssMappingChangeKey, deterministicRandom()->randomUniqueID().toString());
wait(tssTr->commit());
for (auto& tss : tssToKill) {
TraceEvent(SevWarnAlways, "TSS_KillMoveKeys").detail("TSSID", tss.id().toString());
tssToIgnore.insert(tss.id());
}
tssToKill.clear();
break;
} catch (Error& e) {
wait(tssTr->onError(e));
}
for (auto& tssPair : tssToKill) {
TraceEvent(SevWarnAlways, "TSS_KillMoveKeys").detail("TSSID", tssPair.second);
tssToIgnore.insert(tssPair.second);
}
tssToKill.clear();
}
tr.info.taskID = TaskPriority::MoveKeys;
@ -861,9 +848,6 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
// update client info in case tss mapping changed or server got updated
// Use most up to date version of tss mapping
ClientDBInfo clientInfo = occ->clientInfo->get();
// Wait for new destination servers to fetch the keys
serverReady.reserve(storageServerInterfaces.size());
@ -875,13 +859,13 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
tr.getReadVersion().get(),
GetShardStateRequest::READABLE));
Optional<StorageServerInterface> tssPair =
clientInfo.getTssPair(storageServerInterfaces[s].id());
auto tssPair = tssMapping.find(storageServerInterfaces[s].id());
if (tssPair.present() && waitForTSSCounter > 0 && !tssToIgnore.count(tssPair.get().id())) {
tssReadyInterfs.push_back(tssPair.get());
if (tssPair != tssMapping.end() && waitForTSSCounter > 0 &&
!tssToIgnore.count(tssPair->second.id())) {
tssReadyInterfs.push_back(tssPair->second);
tssReady.push_back(waitForShardReady(
tssPair.get(), keys, tr.getReadVersion().get(), GetShardStateRequest::READABLE));
tssPair->second, keys, tr.getReadVersion().get(), GetShardStateRequest::READABLE));
}
}
@ -918,7 +902,8 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
if (anyTssNotDone && waitForTSSCounter == 0) {
for (int i = 0; i < tssReady.size(); i++) {
if (!tssReady[i].isReady() || tssReady[i].isError()) {
tssToKill.push_back(tssReadyInterfs[i]);
tssToKill.push_back(
std::pair(tssReadyInterfs[i].tssPairID.get(), tssReadyInterfs[i].id()));
}
}
// repeat loop and go back to start to kill tss' before continuing on
@ -1080,7 +1065,6 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServe
}
tssMapDB.set(tr, server.tssPairID.get(), server.id());
tr->set(tssMappingChangeKey, deterministicRandom()->randomUniqueID().toString());
} else {
int8_t maxTagLocality = 0;
@ -1143,7 +1127,6 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServe
// THIS SHOULD NEVER BE ENABLED IN ANY NON-TESTING ENVIRONMENT
TraceEvent(SevError, "TSSIdentityMappingEnabled");
tssMapDB.set(tr, server.id(), server.id());
tr->set(tssMappingChangeKey, deterministicRandom()->randomUniqueID().toString());
}
}
@ -1269,10 +1252,8 @@ ACTOR Future<Void> removeStorageServer(Database cx,
// THIS SHOULD NEVER BE ENABLED IN ANY NON-TESTING ENVIRONMENT
TraceEvent(SevError, "TSSIdentityMappingEnabled");
tssMapDB.erase(tr, serverID);
tr->set(tssMappingChangeKey, deterministicRandom()->randomUniqueID().toString());
} else if (tssPairID.present()) {
tssMapDB.erase(tr, tssPairID.get());
tr->set(tssMappingChangeKey, deterministicRandom()->randomUniqueID().toString());
}
retry = true;
@ -1374,11 +1355,20 @@ ACTOR Future<Void> moveKeys(Database cx,
const DDEnabledState* ddEnabledState) {
ASSERT(destinationTeam.size());
std::sort(destinationTeam.begin(), destinationTeam.end());
wait(startMoveKeys(
cx, keys, destinationTeam, lock, startMoveKeysParallelismLock, relocationIntervalId, ddEnabledState));
state std::map<UID, StorageServerInterface> tssMapping;
wait(startMoveKeys(cx,
keys,
destinationTeam,
lock,
startMoveKeysParallelismLock,
relocationIntervalId,
&tssMapping,
ddEnabledState));
state Future<Void> completionSignaller =
checkFetchingState(cx, healthyDestinations, keys, dataMovementComplete, relocationIntervalId);
checkFetchingState(cx, healthyDestinations, keys, dataMovementComplete, relocationIntervalId, tssMapping);
wait(finishMoveKeys(cx,
keys,
@ -1387,6 +1377,7 @@ ACTOR Future<Void> moveKeys(Database cx,
finishMoveKeysParallelismLock,
hasRemote,
relocationIntervalId,
tssMapping,
ddEnabledState));
// This is defensive, but make sure that we always say that the movement is complete before moveKeys completes
@ -1428,8 +1419,6 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, vector<StorageServ
// hack key-backed map here since we can't really change CommitTransactionRef to a RYW transaction
Key uidRef = Codec<UID>::pack(s.id()).pack();
tr.set(arena, uidRef.withPrefix(tssMappingKeys.begin), uidRef);
// tssMapDB.set(tr, server.id(), server.id());
tr.set(arena, tssMappingChangeKey, deterministicRandom()->randomUniqueID().toString());
}
}

View File

@ -158,6 +158,7 @@ struct ProxyCommitData {
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
std::map<UID, Reference<StorageInfo>> storageCache;
std::unordered_map<UID, StorageServerInterface> tssMapping;
std::map<Tag, Version> tag_popped;
Deque<std::pair<Version, Version>> txsPopVersions;
Version lastTxsPop;

View File

@ -308,9 +308,13 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncV
.detail("SS", servers[i].id());
throw attribute_not_found();
}
messages.push_back(timeoutError(itr->second.eventLogRequest.getReply(
EventLogRequest(StringRef(servers[i].id().toString() + "/StorageMetrics"))),
1.0));
// Ignore TSS in add delay mode since it can purposefully freeze forever
if (!servers[i].isTss() || !g_network->isSimulated() ||
g_simulator.tssMode != ISimulator::TSSMode::EnabledAddDelay) {
messages.push_back(timeoutError(itr->second.eventLogRequest.getReply(EventLogRequest(
StringRef(servers[i].id().toString() + "/StorageMetrics"))),
1.0));
}
}
wait(waitForAll(messages));

View File

@ -0,0 +1,88 @@
/*
* TSSMappingUtil.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/SystemData.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbserver/TSSMappingUtil.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// TODO should I just change back to not use KeyBackedMap at this point?
/*ACTOR Future<std::map<UID, StorageServerInterface>> readTSSMapping(Database cx) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
try {
state std::map<UID, StorageServerInterface> mapping;
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
readTSSMappingRYW(tr, &mapping);
return mapping;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}*/
ACTOR Future<Void> readTSSMappingRYW(Reference<ReadYourWritesTransaction> tr, std::map<UID, StorageServerInterface>* tssMapping) {
KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
state std::vector<std::pair<UID, UID>> uidMapping = wait(tssMapDB.getRange(tr, UID(), Optional<UID>(), CLIENT_KNOBS->TOO_MANY));
ASSERT(uidMapping.size() < CLIENT_KNOBS->TOO_MANY);
state std::map<UID, StorageServerInterface> mapping;
for (auto& it : uidMapping) {
state UID ssId = it.first;
Optional<Value> v = wait(tr->get(serverListKeyFor(it.second)));
(*tssMapping)[ssId] = decodeServerListValue(v.get());
}
return Void();
}
ACTOR Future<Void> readTSSMapping(Transaction* tr, std::map<UID, StorageServerInterface>* tssMapping) {
state RangeResult mappingList = wait(tr->getRange(tssMappingKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!mappingList.more && mappingList.size() < CLIENT_KNOBS->TOO_MANY);
for (auto& it : mappingList) {
state UID ssId = Codec<UID>::unpack(Tuple::unpack(it.key.removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(it.value));
Optional<Value> v = wait(tr->get(serverListKeyFor(tssId)));
(*tssMapping)[ssId] = decodeServerListValue(v.get());
}
return Void();
}
ACTOR Future<Void> removeTSSPairsFromCluster(Database cx, vector<std::pair<UID, UID>> pairsToRemove) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
loop {
try {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
for (auto& tssPair : pairsToRemove) {
// DO NOT remove server list key - that'll break a bunch of stuff. DD will eventually call removeStorageServer
tr->clear(serverTagKeyFor(tssPair.second));
tssMapDB.erase(tr, tssPair.first);
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
return Void();
}

View File

@ -0,0 +1,36 @@
/*
* TSSMappingUtil.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef TSS_MAPPING_UTIL_SERVER_H
#define TSS_MAPPING_UTIL_SERVER_H
#pragma once
#include "fdbclient/StorageServerInterface.h"
// TODO unused
// Future<std::map<UID, StorageServerInterface>> readTSSMapping(Database cx);
Future<Void> readTSSMappingRYW(Reference<ReadYourWritesTransaction> const& tr, std::map<UID, StorageServerInterface>* const& tssMapping);
Future<Void> readTSSMapping(Transaction* const& tr, std::map<UID, StorageServerInterface>* const& tssMapping);
Future<Void> removeTSSPairsFromCluster(Database const& cx, vector<std::pair<UID, UID>> const& pairsToRemove);
#endif

View File

@ -719,9 +719,9 @@ public:
fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc),
readsRejected("ReadsRejected", cc), fetchedVersions("FetchedVersions", cc),
fetchesFromLogs("FetchesFromLogs", cc), readLatencySample("ReadLatencyMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; });
specialCounter(cc, "Version", [self]() { return self->version.get(); });
@ -3254,6 +3254,10 @@ private:
(m.type == MutationRef::ClearRange && (matchesThisServer || (data->isTss() && matchesTssPair)))) {
throw worker_removed();
}
if (!data->isTss() && m.type == MutationRef::ClearRange && data->ssPairID.present() &&
serverTagKey == data->ssPairID.get()) {
data->clearSSWithTssPair();
}
} else if (m.type == MutationRef::SetValue && m.param1 == rebootWhenDurablePrivateKey) {
data->rebootAfterDurableVersion = currentVersion;
TraceEvent("RebootWhenDurableSet", data->thisServerID)
@ -3263,6 +3267,13 @@ private:
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(m.param2, Unversioned());
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(mLV, MutationRef(MutationRef::SetValue, persistPrimaryLocality, m.param2));
} else if (m.type == MutationRef::SetValue && m.param1.substr(1).startsWith(tssMappingKeys.begin)) {
if (!data->isTss()) {
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.substr(1).removePrefix(tssMappingKeys.begin)));
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
ASSERT(ssId == data->thisServerID);
data->setSSWithTssPair(tssId);
}
} else {
ASSERT(false); // Unknown private mutation
}
@ -3588,8 +3599,9 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
data->sourceTLogID = curSourceTLogID;
TraceEvent("StorageServerSourceTLogID", data->thisServerID)
.detail("SourceTLogID", data->sourceTLogID.present() ? data->sourceTLogID.get().toString() : "unknown")
.trackLatest(data->thisServerID.toString() + "/StorageServerSourceTLogID");
.detail("SourceTLogID",
data->sourceTLogID.present() ? data->sourceTLogID.get().toString() : "unknown")
.trackLatest(data->thisServerID.toString() + "/StorageServerSourceTLogID");
}
data->noRecentUpdates.set(false);
@ -4678,18 +4690,6 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
}
}
}
// SS monitors tss mapping here to see if it has a tss pair.
// This information is only used for ss/tss pair metrics reporting so it's ok to be eventually
// consistent.
if (!self->isTss()) {
ClientDBInfo clientInfo = self->db->get().client;
Optional<StorageServerInterface> myTssPair = clientInfo.getTssPair(self->thisServerID);
if (myTssPair.present()) {
self->setSSWithTssPair(myTssPair.get().id());
} else {
self->clearSSWithTssPair();
}
}
}
when(GetShardStateRequest req = waitNext(ssi.getShardState.getFuture())) {
if (req.mode == GetShardStateRequest::NO_WAIT) {
@ -4831,6 +4831,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
rep.addedVersion = self.version.get();
recruitReply.send(rep);
self.byteSampleRecovery = Void();
wait(storageServerCore(&self, ssi));
throw internal_error();
@ -4964,9 +4965,7 @@ ACTOR Future<Void> replaceTSSInterface(StorageServer* self, StorageServerInterfa
tr->set(serverListKeyFor(ssi.id()), serverListValue(ssi));
// add itself back to tss mapping
// tr->set(tssMappingKeyFor(self->tssPairID.get()), tssMappingValueFor(ssi.id()));
tssMapDB.set(tr, self->tssPairID.get(), ssi.id());
tr->set(tssMappingChangeKey, deterministicRandom()->randomUniqueID().toString());
wait(tr->commit());
self->tag = myTag;

View File

@ -32,6 +32,7 @@
#include "fdbserver/StorageMetrics.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/TSSMappingUtil.h"
#include "flow/DeterministicRandom.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/StorageServerInterface.h"
@ -209,11 +210,16 @@ struct ConsistencyCheckWorkload : TestWorkload {
if (self->firstClient || self->distributed) {
try {
state DatabaseConfiguration configuration;
state std::map<UID, StorageServerInterface> tssMapping;
state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
loop {
try {
if (self->performTSSCheck) {
tssMapping.clear();
wait(readTSSMapping(&tr, &tssMapping));
}
RangeResult res = wait(tr.getRange(configKeys, 1000));
if (res.size() == 1000) {
TraceEvent("ConsistencyCheck_TooManyConfigOptions");
@ -286,7 +292,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
throw;
}
wait(::success(self->checkForStorage(cx, configuration, self)));
wait(::success(self->checkForStorage(cx, configuration, tssMapping, self)));
wait(::success(self->checkForExtraDataStores(cx, self)));
// Check that each machine is operating as its desired class
@ -317,7 +323,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
state Standalone<VectorRef<KeyValueRef>> keyLocations = keyLocationPromise.getFuture().get();
// Check that each shard has the same data on all storage servers that it resides on
wait(::success(self->checkDataConsistency(cx, keyLocations, configuration, self)));
wait(::success(self->checkDataConsistency(cx, keyLocations, configuration, tssMapping, self)));
// Cache consistency check
if (self->performCacheCheck)
@ -1124,6 +1130,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
ACTOR Future<bool> checkDataConsistency(Database cx,
VectorRef<KeyValueRef> keyLocations,
DatabaseConfiguration configuration,
std::map<UID, StorageServerInterface> tssMapping,
ConsistencyCheckWorkload* self) {
// Stores the total number of bytes on each storage server
// In a distributed test, this will be an estimated size
@ -1250,10 +1257,11 @@ struct ConsistencyCheckWorkload : TestWorkload {
if (!isRelocating && self->performTSSCheck) {
int initialSize = storageServers.size();
for (int i = 0; i < initialSize; i++) {
Optional<StorageServerInterface> tssPair = cx->clientInfo->get().getTssPair(storageServers[i]);
if (tssPair.present()) {
storageServers.push_back(tssPair.get().id());
storageServerInterfaces.push_back(tssPair.get());
auto tssPair = tssMapping.find(storageServers[i]);
if (tssPair != tssMapping.end()) {
TEST(true); // TSS checked in consistency check
storageServers.push_back(tssPair->second.id());
storageServerInterfaces.push_back(tssPair->second);
}
}
}
@ -1491,7 +1499,9 @@ struct ConsistencyCheckWorkload : TestWorkload {
// All shards should be available in quiscence
if (self->performQuiescentChecks &&
(g_network->isSimulated() || !storageServerInterfaces[j].isTss())) {
((g_network->isSimulated() &&
g_simulator.tssMode != ISimulator::TSSMode::EnabledAddDelay) ||
!storageServerInterfaces[j].isTss())) {
self->testFailure("Storage server unavailable");
return false;
}
@ -1746,6 +1756,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
// Returns false if any worker that should have a storage server does not have one
ACTOR Future<bool> checkForStorage(Database cx,
DatabaseConfiguration configuration,
std::map<UID, StorageServerInterface> tssMapping,
ConsistencyCheckWorkload* self) {
state vector<WorkerDetails> workers = wait(getWorkers(self->dbInfo));
state vector<StorageServerInterface> storageServers = wait(getStorageServers(cx));
@ -1786,8 +1797,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
(configuration.regions.size() == 2 && configuration.usableRegions > 1 && (missingDc0 || missingDc1))) {
// TODO could improve this check by also ensuring DD is currently recruiting a TSS by using quietdb?
bool couldExpectMissingTss =
(configuration.desiredTSSCount - self->dbInfo->get().client.tssMapping.size()) > 0;
bool couldExpectMissingTss = (configuration.desiredTSSCount - tssMapping.size()) > 0;
int countMissing = missingStorage.size();
int acceptableTssMissing = 1;