Build a TenantCache for use by DD (#7207)
* Add an DD tenant-cache-assembly actor * Add basic tenant list monitoring for tenant cache. * Update DD tenant cache refresh to be more efficient and unit-testable * Remove the DD prefix in the tenant cache class name (and associated impl and UT class names); there is nothing specific to DD in it; DD uses it; other modules may use it in the future * Disable DD tenant awareness by default
This commit is contained in:
parent
9ca8a3c683
commit
8cf2be030f
|
@ -286,6 +286,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY, 120 ); if( randomize && BUGGIFY ) DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY = 5;
|
||||
init( DD_STORAGE_WIGGLE_PAUSE_THRESHOLD, 10 ); if( randomize && BUGGIFY ) DD_STORAGE_WIGGLE_PAUSE_THRESHOLD = 1000;
|
||||
init( DD_STORAGE_WIGGLE_STUCK_THRESHOLD, 20 );
|
||||
init( DD_TENANT_AWARENESS_ENABLED, false );
|
||||
init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2.0 );
|
||||
|
||||
// TeamRemover
|
||||
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
|
||||
|
|
|
@ -232,6 +232,8 @@ public:
|
|||
int DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY;
|
||||
int DD_STORAGE_WIGGLE_PAUSE_THRESHOLD; // How many unhealthy relocations are ongoing will pause storage wiggle
|
||||
int DD_STORAGE_WIGGLE_STUCK_THRESHOLD; // How many times bestTeamStuck accumulate will pause storage wiggle
|
||||
bool DD_TENANT_AWARENESS_ENABLED;
|
||||
int TENANT_CACHE_LIST_REFRESH_INTERVAL; // How often the TenantCache is refreshed
|
||||
|
||||
// TeamRemover to remove redundant teams
|
||||
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor
|
||||
|
|
|
@ -164,6 +164,8 @@ set(FDBSERVER_SRCS
|
|||
TCInfo.h
|
||||
template_fdb.h
|
||||
tester.actor.cpp
|
||||
TenantCache.actor.cpp
|
||||
TenantCache.h
|
||||
TesterInterface.actor.h
|
||||
TLogInterface.h
|
||||
TLogServer.actor.cpp
|
||||
|
|
|
@ -3830,7 +3830,8 @@ int DDTeamCollection::overlappingMachineMembers(std::vector<Standalone<StringRef
|
|||
void DDTeamCollection::addTeam(const std::vector<Reference<TCServerInfo>>& newTeamServers,
|
||||
IsInitialTeam isInitialTeam,
|
||||
IsRedundantTeam redundantTeam) {
|
||||
auto teamInfo = makeReference<TCTeamInfo>(newTeamServers);
|
||||
Optional<Reference<TCTenantInfo>> no_tenant = {};
|
||||
auto teamInfo = makeReference<TCTeamInfo>(newTeamServers, no_tenant);
|
||||
|
||||
// Move satisfiesPolicy to the end for performance benefit
|
||||
auto badTeam = IsBadTeam{ redundantTeam || teamInfo->size() != configuration.storageTeamSize ||
|
||||
|
|
|
@ -49,6 +49,7 @@
|
|||
#include "flow/UnitTest.h"
|
||||
|
||||
class TCTeamInfo;
|
||||
class TCTenantInfo;
|
||||
class TCMachineInfo;
|
||||
class TCMachineTeamInfo;
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#include <set>
|
||||
#include <string>
|
||||
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
|
@ -28,6 +29,7 @@
|
|||
#include "fdbclient/RunTransaction.actor.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbrpc/Replication.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
|
@ -37,6 +39,7 @@
|
|||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/TenantCache.h"
|
||||
#include "fdbserver/TLogInterface.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
|
@ -513,6 +516,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
state Reference<DDTeamCollection> primaryTeamCollection;
|
||||
state Reference<DDTeamCollection> remoteTeamCollection;
|
||||
state bool trackerCancelled;
|
||||
state bool ddIsTenantAware = SERVER_KNOBS->DD_TENANT_AWARENESS_ENABLED;
|
||||
loop {
|
||||
trackerCancelled = false;
|
||||
|
||||
|
@ -597,6 +601,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
// mode may be set true by system operator using fdbcli and isDDEnabled() set to true
|
||||
break;
|
||||
}
|
||||
|
||||
TraceEvent("DataDistributionDisabled", self->ddId).log();
|
||||
|
||||
TraceEvent("MovingData", self->ddId)
|
||||
|
@ -637,6 +642,12 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
TraceEvent("DataDistributionEnabled").log();
|
||||
}
|
||||
|
||||
state Reference<TenantCache> ddTenantCache;
|
||||
if (ddIsTenantAware) {
|
||||
ddTenantCache = makeReference<TenantCache>(cx, self->ddId);
|
||||
wait(ddTenantCache->build(cx));
|
||||
}
|
||||
|
||||
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
|
||||
ASSERT(configuration.storageTeamSize > 0);
|
||||
|
||||
|
@ -705,6 +716,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
} else {
|
||||
anyZeroHealthyTeams = zeroHealthyTeams[0];
|
||||
}
|
||||
if (ddIsTenantAware) {
|
||||
actors.push_back(reportErrorsExcept(
|
||||
ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors()));
|
||||
}
|
||||
|
||||
actors.push_back(pollMoveKeysLock(cx, lock, ddEnabledState));
|
||||
actors.push_back(reportErrorsExcept(dataDistributionTracker(initData,
|
||||
|
|
|
@ -307,9 +307,9 @@ std::string TCMachineTeamInfo::getMachineIDsStr() const {
|
|||
return std::move(ss).str();
|
||||
}
|
||||
|
||||
TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers)
|
||||
: servers(servers), healthy(true), wrongConfiguration(false), priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY),
|
||||
id(deterministicRandom()->randomUniqueID()) {
|
||||
TCTeamInfo::TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers, Optional<Reference<TCTenantInfo>> tenant)
|
||||
: servers(servers), tenant(tenant), healthy(true), wrongConfiguration(false),
|
||||
priority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY), id(deterministicRandom()->randomUniqueID()) {
|
||||
if (servers.empty()) {
|
||||
TraceEvent(SevInfo, "ConstructTCTeamFromEmptyServers").log();
|
||||
}
|
||||
|
|
|
@ -20,9 +20,13 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/FastRef.h"
|
||||
|
||||
class TCTeamInfo;
|
||||
class TCTenantInfo;
|
||||
class TCMachineInfo;
|
||||
class TCMachineTeamInfo;
|
||||
|
||||
|
@ -166,6 +170,7 @@ class TCTeamInfo final : public ReferenceCounted<TCTeamInfo>, public IDataDistri
|
|||
friend class TCTeamInfoImpl;
|
||||
std::vector<Reference<TCServerInfo>> servers;
|
||||
std::vector<UID> serverIDs;
|
||||
Optional<Reference<TCTenantInfo>> tenant;
|
||||
bool healthy;
|
||||
bool wrongConfiguration; // True if any of the servers in the team have the wrong configuration
|
||||
int priority;
|
||||
|
@ -175,7 +180,9 @@ public:
|
|||
Reference<TCMachineTeamInfo> machineTeam;
|
||||
Future<Void> tracker;
|
||||
|
||||
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers);
|
||||
explicit TCTeamInfo(std::vector<Reference<TCServerInfo>> const& servers, Optional<Reference<TCTenantInfo>> tenant);
|
||||
|
||||
Optional<Reference<TCTenantInfo>>& getTenant() { return tenant; }
|
||||
|
||||
std::string getTeamID() const override { return id.shortString(); }
|
||||
|
||||
|
@ -235,3 +242,24 @@ private:
|
|||
|
||||
bool allServersHaveHealthyAvailableSpace() const;
|
||||
};
|
||||
|
||||
class TCTenantInfo : public ReferenceCounted<TCTenantInfo> {
|
||||
private:
|
||||
TenantInfo m_tenantInfo;
|
||||
Key m_prefix;
|
||||
std::vector<Reference<TCTeamInfo>> m_tenantTeams;
|
||||
int64_t m_cacheGeneration;
|
||||
|
||||
public:
|
||||
TCTenantInfo() { m_prefix = allKeys.end; }
|
||||
TCTenantInfo(TenantInfo tinfo, Key prefix) : m_tenantInfo(tinfo), m_prefix(prefix) {}
|
||||
std::vector<Reference<TCTeamInfo>>& teams() { return m_tenantTeams; }
|
||||
|
||||
TenantName name() { return m_tenantInfo.name.get(); }
|
||||
std::string prefixDesc() { return m_prefix.printable(); }
|
||||
|
||||
void addTeam(TCTeamInfo team);
|
||||
void removeTeam(TCTeamInfo team);
|
||||
void updateCacheGeneration(int64_t generation) { m_cacheGeneration = generation; }
|
||||
int64_t cacheGeneration() const { return m_cacheGeneration; }
|
||||
};
|
||||
|
|
|
@ -0,0 +1,304 @@
|
|||
/*
|
||||
* TenantCache.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "fdbserver/TenantCache.h"
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
class TenantCacheImpl {
|
||||
|
||||
ACTOR static Future<RangeResult> getTenantList(TenantCache* tenantCache, Transaction* tr) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
|
||||
state Future<RangeResult> tenantList = tr->getRange(tenantMapKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
wait(success(tenantList));
|
||||
ASSERT(!tenantList.get().more && tenantList.get().size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
return tenantList.get();
|
||||
}
|
||||
|
||||
public:
|
||||
ACTOR static Future<Void> build(TenantCache* tenantCache) {
|
||||
state Transaction tr(tenantCache->dbcx());
|
||||
|
||||
TraceEvent(SevInfo, "BuildingTenantCache", tenantCache->id()).log();
|
||||
|
||||
try {
|
||||
state RangeResult tenantList = wait(getTenantList(tenantCache, &tr));
|
||||
|
||||
for (int i = 0; i < tenantList.size(); i++) {
|
||||
TenantName tname = tenantList[i].key.removePrefix(tenantMapPrefix);
|
||||
TenantMapEntry t = decodeTenantEntry(tenantList[i].value);
|
||||
|
||||
tenantCache->insert(tname, t);
|
||||
|
||||
TraceEvent(SevInfo, "TenantFound", tenantCache->id())
|
||||
.detail("TenantName", tname)
|
||||
.detail("TenantID", t.id)
|
||||
.detail("TenantPrefix", t.prefix);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
||||
TraceEvent(SevInfo, "BuiltTenantCache", tenantCache->id()).log();
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorTenantMap(TenantCache* tenantCache) {
|
||||
TraceEvent(SevInfo, "StartingTenantCacheMonitor", tenantCache->id()).log();
|
||||
|
||||
state Transaction tr(tenantCache->dbcx());
|
||||
|
||||
state double lastTenantListFetchTime = now();
|
||||
|
||||
loop {
|
||||
try {
|
||||
if (now() - lastTenantListFetchTime > (2 * SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL)) {
|
||||
TraceEvent(SevWarn, "TenantListRefreshDelay", tenantCache->id()).log();
|
||||
}
|
||||
|
||||
state RangeResult tenantList = wait(getTenantList(tenantCache, &tr));
|
||||
|
||||
tenantCache->startRefresh();
|
||||
bool tenantListUpdated = false;
|
||||
|
||||
for (int i = 0; i < tenantList.size(); i++) {
|
||||
TenantName tname = tenantList[i].key.removePrefix(tenantMapPrefix);
|
||||
TenantMapEntry t = decodeTenantEntry(tenantList[i].value);
|
||||
|
||||
if (tenantCache->update(tname, t)) {
|
||||
tenantListUpdated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (tenantCache->cleanup()) {
|
||||
tenantListUpdated = true;
|
||||
}
|
||||
|
||||
if (tenantListUpdated) {
|
||||
TraceEvent(SevInfo, "TenantCache", tenantCache->id()).detail("List", tenantCache->desc());
|
||||
}
|
||||
|
||||
lastTenantListFetchTime = now();
|
||||
tr.reset();
|
||||
wait(delay(SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL));
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled) {
|
||||
TraceEvent("TenantCacheGetTenantListError", tenantCache->id())
|
||||
.errorUnsuppressed(e)
|
||||
.suppressFor(1.0);
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void TenantCache::insert(TenantName& tenantName, TenantMapEntry& tenant) {
|
||||
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
|
||||
ASSERT(tenantCache.find(tenantPrefix) == tenantCache.end());
|
||||
|
||||
TenantInfo tenantInfo(tenantName, tenant.id);
|
||||
tenantCache[tenantPrefix] = makeReference<TCTenantInfo>(tenantInfo, tenant.prefix);
|
||||
tenantCache[tenantPrefix]->updateCacheGeneration(generation);
|
||||
}
|
||||
|
||||
void TenantCache::startRefresh() {
|
||||
ASSERT(generation < std::numeric_limits<uint64_t>::max());
|
||||
generation++;
|
||||
}
|
||||
|
||||
void TenantCache::keep(TenantName& tenantName, TenantMapEntry& tenant) {
|
||||
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
|
||||
|
||||
ASSERT(tenantCache.find(tenantPrefix) != tenantCache.end());
|
||||
tenantCache[tenantPrefix]->updateCacheGeneration(generation);
|
||||
}
|
||||
|
||||
bool TenantCache::update(TenantName& tenantName, TenantMapEntry& tenant) {
|
||||
KeyRef tenantPrefix(tenant.prefix.begin(), tenant.prefix.size());
|
||||
|
||||
if (tenantCache.find(tenantPrefix) != tenantCache.end()) {
|
||||
keep(tenantName, tenant);
|
||||
return false;
|
||||
}
|
||||
|
||||
insert(tenantName, tenant);
|
||||
return true;
|
||||
}
|
||||
|
||||
int TenantCache::cleanup() {
|
||||
int tenantsRemoved = 0;
|
||||
std::vector<Key> keysToErase;
|
||||
|
||||
for (auto& t : tenantCache) {
|
||||
ASSERT(t.value->cacheGeneration() <= generation);
|
||||
if (t.value->cacheGeneration() != generation) {
|
||||
keysToErase.push_back(t.key);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& k : keysToErase) {
|
||||
tenantCache.erase(k);
|
||||
tenantsRemoved++;
|
||||
}
|
||||
|
||||
return tenantsRemoved;
|
||||
}
|
||||
|
||||
std::string TenantCache::desc() const {
|
||||
std::string s("@Generation: ");
|
||||
s += std::to_string(generation) + " ";
|
||||
int count = 0;
|
||||
for (auto& [tenantPrefix, tenant] : tenantCache) {
|
||||
if (count) {
|
||||
s += ", ";
|
||||
}
|
||||
|
||||
s += "Name: " + tenant->name().toString() + " Prefix: " + tenantPrefix.printable();
|
||||
count++;
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
bool TenantCache::isTenantKey(KeyRef key) const {
|
||||
auto it = tenantCache.lastLessOrEqual(key);
|
||||
if (it == tenantCache.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!key.startsWith(it->key)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
Future<Void> TenantCache::build(Database cx) {
|
||||
return TenantCacheImpl::build(this);
|
||||
}
|
||||
|
||||
Future<Void> TenantCache::monitorTenantMap() {
|
||||
return TenantCacheImpl::monitorTenantMap(this);
|
||||
}
|
||||
|
||||
class TenantCacheUnitTest {
|
||||
public:
|
||||
ACTOR static Future<Void> InsertAndTestPresence() {
|
||||
wait(Future<Void>(Void()));
|
||||
|
||||
Database cx;
|
||||
TenantCache tenantCache(cx, UID(1, 0));
|
||||
|
||||
constexpr static uint16_t tenantLimit = 64;
|
||||
|
||||
uint16_t tenantCount = deterministicRandom()->randomInt(1, tenantLimit);
|
||||
uint16_t tenantNumber = deterministicRandom()->randomInt(0, std::numeric_limits<uint16_t>::max());
|
||||
|
||||
for (uint16_t i = 0; i < tenantCount; i++) {
|
||||
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantNumber + i));
|
||||
TenantMapEntry tenant(tenantNumber + i, KeyRef());
|
||||
|
||||
tenantCache.insert(tenantName, tenant);
|
||||
}
|
||||
|
||||
for (int i = 0; i < tenantLimit; i++) {
|
||||
Key k(format("%d", i));
|
||||
ASSERT(tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantNumber + (i % tenantCount)))));
|
||||
ASSERT(!tenantCache.isTenantKey(k.withPrefix(allKeys.begin)));
|
||||
ASSERT(!tenantCache.isTenantKey(k));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> RefreshAndTestPresence() {
|
||||
wait(Future<Void>(Void()));
|
||||
|
||||
Database cx;
|
||||
TenantCache tenantCache(cx, UID(1, 0));
|
||||
|
||||
constexpr static uint16_t tenantLimit = 64;
|
||||
|
||||
uint16_t tenantCount = deterministicRandom()->randomInt(1, tenantLimit);
|
||||
uint16_t tenantNumber = deterministicRandom()->randomInt(0, std::numeric_limits<uint16_t>::max());
|
||||
|
||||
for (uint16_t i = 0; i < tenantCount; i++) {
|
||||
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantNumber + i));
|
||||
TenantMapEntry tenant(tenantNumber + i, KeyRef());
|
||||
|
||||
tenantCache.insert(tenantName, tenant);
|
||||
}
|
||||
|
||||
uint16_t staleTenantFraction = deterministicRandom()->randomInt(1, 8);
|
||||
tenantCache.startRefresh();
|
||||
|
||||
int keepCount = 0, removeCount = 0;
|
||||
for (int i = 0; i < tenantCount; i++) {
|
||||
uint16_t tenantOrdinal = tenantNumber + i;
|
||||
|
||||
if (tenantOrdinal % staleTenantFraction != 0) {
|
||||
TenantName tenantName(format("%s_%08d", "ddtc_test_tenant", tenantOrdinal));
|
||||
TenantMapEntry tenant(tenantOrdinal, KeyRef());
|
||||
bool newTenant = tenantCache.update(tenantName, tenant);
|
||||
ASSERT(!newTenant);
|
||||
keepCount++;
|
||||
} else {
|
||||
removeCount++;
|
||||
}
|
||||
}
|
||||
int tenantsRemoved = tenantCache.cleanup();
|
||||
ASSERT(tenantsRemoved == removeCount);
|
||||
|
||||
int keptCount = 0, removedCount = 0;
|
||||
for (int i = 0; i < tenantCount; i++) {
|
||||
uint16_t tenantOrdinal = tenantNumber + i;
|
||||
Key k(format("%d", i));
|
||||
if (tenantOrdinal % staleTenantFraction != 0) {
|
||||
ASSERT(tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantOrdinal))));
|
||||
keptCount++;
|
||||
} else {
|
||||
ASSERT(!tenantCache.isTenantKey(k.withPrefix(TenantMapEntry::idToPrefix(tenantOrdinal))));
|
||||
removedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(keepCount == keptCount);
|
||||
ASSERT(removeCount == removedCount);
|
||||
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_CASE("/TenantCache/InsertAndTestPresence") {
|
||||
wait(TenantCacheUnitTest::InsertAndTestPresence());
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/TenantCache/RefreshAndTestPresence") {
|
||||
wait(TenantCacheUnitTest::RefreshAndTestPresence());
|
||||
return Void();
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* TenantCache.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "fdbserver/TCInfo.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/IndexedSet.h"
|
||||
#include <limits>
|
||||
#include <string>
|
||||
|
||||
typedef Map<KeyRef, Reference<TCTenantInfo>> TenantMapByPrefix;
|
||||
|
||||
class TenantCache : public ReferenceCounted<TenantCache> {
|
||||
friend class TenantCacheImpl;
|
||||
friend class TenantCacheUnitTest;
|
||||
|
||||
private:
|
||||
constexpr static uint64_t INVALID_GENERATION = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
UID distributorID;
|
||||
Database cx;
|
||||
uint64_t generation;
|
||||
TenantMapByPrefix tenantCache;
|
||||
|
||||
// mark the start of a new sweep of the tenant cache
|
||||
void startRefresh();
|
||||
|
||||
void insert(TenantName& tenantName, TenantMapEntry& tenant);
|
||||
void keep(TenantName& tenantName, TenantMapEntry& tenant);
|
||||
|
||||
// return true if a new tenant is inserted into the cache
|
||||
bool update(TenantName& tenantName, TenantMapEntry& tenant);
|
||||
|
||||
// return count of tenants that were found to be stale and removed from the cache
|
||||
int cleanup();
|
||||
|
||||
UID id() const { return distributorID; }
|
||||
|
||||
Database dbcx() const { return cx; }
|
||||
|
||||
public:
|
||||
TenantCache(Database cx, UID distributorID) : distributorID(distributorID), cx(cx) {
|
||||
generation = deterministicRandom()->randomUInt32();
|
||||
}
|
||||
|
||||
Future<Void> build(Database cx);
|
||||
|
||||
Future<Void> monitorTenantMap();
|
||||
|
||||
std::string desc() const;
|
||||
|
||||
bool isTenantKey(KeyRef key) const;
|
||||
};
|
Loading…
Reference in New Issue