Implement TenantCacheEntry in-memory cache (#7801)

* Implement TenantCacheEntry in-memory cache

Description

  diff-4: TraceEvent usage improvements 
  diff-3: Address review comments
  diff-2: Add APIs to read counter values, test improvements
  diff-1: Address review comments

Major changes includes:
1. Implements an actor that enables an in-memory caching of
TenantCacheEntry object, allowing the caller to embed custom
information along with TenantCacheEntry.
2. The cache follows read-through cache semantics where the entry
gets loaded from underlying database on a miss.
3. The cache implements a "periodic poller" to refresh known Tenants
by consulting the database. Once a database keyrange-watch feature is
available, cache shall be updated.

Bonus:
Implement a 'recurringAsync' addition to genericActors allowing caller
to schedule a periodic task registering an "actor functor"; the routine
'waits' for the actor unlike existing 'recurring' implementation.

Testing

TenantEntryCache workload
devCorrectnessRun - 100K
This commit is contained in:
Ata E Husain Bohra 2022-08-25 11:42:26 -07:00 committed by GitHub
parent d6b1ac056c
commit 00fe4863b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 774 additions and 22 deletions

View File

@ -294,7 +294,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( DD_STORAGE_WIGGLE_STUCK_THRESHOLD, 20 );
init( DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC, isSimulated ? 2 : 21 * 60 * 60 * 24 ); if(randomize && BUGGIFY) DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC = isSimulated ? 0: 120;
init( DD_TENANT_AWARENESS_ENABLED, false );
init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2.0 );
init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_CACHE_LIST_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
// TeamRemover
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -591,12 +591,13 @@ ACTOR Future<Void> refreshEncryptionKeysCore(Reference<EncryptKeyProxyData> ekpP
try {
KmsConnLookupEKsByDomainIdsReq req;
req.debugId = debugId;
req.encryptDomainInfos.reserve(req.arena, ekpProxyData->baseCipherDomainIdCache.size());
// req.encryptDomainInfos.reserve(req.arena, ekpProxyData->baseCipherDomainIdCache.size());
int64_t currTS = (int64_t)now();
for (auto itr = ekpProxyData->baseCipherDomainIdCache.begin();
itr != ekpProxyData->baseCipherDomainIdCache.end();) {
if (isCipherKeyEligibleForRefresh(itr->second, currTS)) {
TraceEvent("RefreshEKs").detail("Id", itr->first);
req.encryptDomainInfos.emplace_back_deep(req.arena, itr->first, itr->second.domainName);
}
@ -612,9 +613,9 @@ ACTOR Future<Void> refreshEncryptionKeysCore(Reference<EncryptKeyProxyData> ekpP
for (const auto& item : rep.cipherKeyDetails) {
const auto itr = ekpProxyData->baseCipherDomainIdCache.find(item.encryptDomainId);
if (itr == ekpProxyData->baseCipherDomainIdCache.end()) {
TraceEvent(SevError, "RefreshEKs_DomainIdNotFound", ekpProxyData->myId)
TraceEvent(SevInfo, "RefreshEKs_DomainIdNotFound", ekpProxyData->myId)
.detail("DomainId", item.encryptDomainId);
// Continue updating the cache with othe elements
// Continue updating the cache with other elements
continue;
}
@ -637,7 +638,7 @@ ACTOR Future<Void> refreshEncryptionKeysCore(Reference<EncryptKeyProxyData> ekpP
ekpProxyData->baseCipherKeysRefreshed += rep.cipherKeyDetails.size();
t.detail("nKeys", rep.cipherKeyDetails.size());
t.detail("NumKeys", rep.cipherKeyDetails.size());
} catch (Error& e) {
if (!canReplyWith(e)) {
TraceEvent(SevWarn, "RefreshEKs_Error").error(e);
@ -650,8 +651,8 @@ ACTOR Future<Void> refreshEncryptionKeysCore(Reference<EncryptKeyProxyData> ekpP
return Void();
}
void refreshEncryptionKeys(Reference<EncryptKeyProxyData> ekpProxyData, KmsConnectorInterface kmsConnectorInf) {
Future<Void> ignored = refreshEncryptionKeysCore(ekpProxyData, kmsConnectorInf);
Future<Void> refreshEncryptionKeys(Reference<EncryptKeyProxyData> ekpProxyData, KmsConnectorInterface kmsConnectorInf) {
return refreshEncryptionKeysCore(ekpProxyData, kmsConnectorInf);
}
ACTOR Future<Void> getLatestBlobMetadata(Reference<EncryptKeyProxyData> ekpProxyData,
@ -813,9 +814,11 @@ ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface,
// FLOW_KNOB->ENCRRYPTION_KEY_REFRESH_INTERVAL_SEC, allowing the interactions with external Encryption Key Manager
// mostly not co-inciding with FDB process encryption key refresh attempts.
self->encryptionKeyRefresher = recurring([&]() { refreshEncryptionKeys(self, kmsConnectorInf); },
FLOW_KNOBS->ENCRYPT_KEY_REFRESH_INTERVAL,
TaskPriority::Worker);
self->encryptionKeyRefresher = recurringAsync([&]() { return refreshEncryptionKeys(self, kmsConnectorInf); },
FLOW_KNOBS->ENCRYPT_KEY_REFRESH_INTERVAL, /* interval */
true, /* absoluteIntervalDelay */
FLOW_KNOBS->ENCRYPT_KEY_REFRESH_INTERVAL, /* initialDelay */
TaskPriority::Worker);
self->blobMetadataRefresher = recurring([&]() { refreshBlobMetadata(self, kmsConnectorInf); },
SERVER_KNOBS->BLOB_METADATA_REFRESH_INTERVAL,

View File

@ -33,7 +33,6 @@
#include "fdbclient/Tenant.h"
#include "fdbserver/ServerDBInfo.h"
#include "flow/flow.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -31,9 +31,9 @@
#include "fdbrpc/Stats.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/ResolverInterface.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "flow/IRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -0,0 +1,390 @@
/*
* TenantEntryCache.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_TENANTENTRYCACHE_ACTOR_G_H)
#define FDBSERVER_TENANTENTRYCACHE_ACTOR_G_H
#include "fdbserver/TenantEntryCache.actor.g.h"
#elif !defined(FDBSERVER_TENANTENTRYCACHE_ACTOR_H)
#define FDBSERVER_TENANTENTRYCACHE_ACTOR_H
#pragma once
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbclient/Tenant.h"
#include "fdbclient/TenantManagement.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbrpc/TenantName.h"
#include "flow/IndexedSet.h"
#include <functional>
#include <unordered_map>
#include "flow/actorcompiler.h" // has to be last include
using TenantNameEntryPair = std::pair<TenantName, TenantMapEntry>;
using TenantNameEntryPairVec = std::vector<TenantNameEntryPair>;
enum class TenantEntryCacheRefreshReason { INIT = 1, PERIODIC_TASK = 2, CACHE_MISS = 3, REMOVE_ENTRY = 4 };
enum class TenantEntryCacheRefreshMode { PERIODIC_TASK = 1, NONE = 2 };
template <class T>
struct TenantEntryCachePayload {
TenantName name;
TenantMapEntry entry;
// Custom client payload
T payload;
};
template <class T>
using TenantEntryCachePayloadFunc = std::function<TenantEntryCachePayload<T>(const TenantName&, const TenantMapEntry&)>;
// In-memory cache for TenantEntryMap objects. It supports three indices:
// 1. Lookup by 'TenantId'
// 2. Lookup by 'TenantPrefix'
// 3. Lookup by 'TenantName'
//
// TODO:
// ----
// The cache allows user to construct the 'cached object' by supplying a callback. The cache implements a periodic
// refresh mechanism, polling underlying database for updates (add/remove tenants), in future we might want to implement
// database range-watch to monitor such updates
template <class T>
class TenantEntryCache : public ReferenceCounted<TenantEntryCache<T>>, NonCopyable {
private:
UID uid;
Database db;
TenantEntryCachePayloadFunc<T> createPayloadFunc;
TenantEntryCacheRefreshMode refreshMode;
Future<Void> refresher;
Map<int64_t, TenantEntryCachePayload<T>> mapByTenantId;
Map<TenantName, TenantEntryCachePayload<T>> mapByTenantName;
CounterCollection metrics;
Counter hits;
Counter misses;
Counter refreshByCacheInit;
Counter refreshByCacheMiss;
Counter numRefreshes;
ACTOR static Future<TenantNameEntryPairVec> getTenantList(Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
KeyBackedRangeResult<std::pair<TenantName, TenantMapEntry>> tenantList =
wait(TenantMetadata::tenantMap().getRange(
tr, Optional<TenantName>(), Optional<TenantName>(), CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER + 1));
ASSERT(tenantList.results.size() <= CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER && !tenantList.more);
TraceEvent(SevDebug, "TenantEntryCacheGetTenantList").detail("Count", tenantList.results.size());
return tenantList.results;
}
static void updateCacheRefreshMetrics(TenantEntryCache<T>* cache, TenantEntryCacheRefreshReason reason) {
if (reason == TenantEntryCacheRefreshReason::INIT) {
cache->refreshByCacheInit += 1;
} else if (reason == TenantEntryCacheRefreshReason::CACHE_MISS) {
cache->refreshByCacheMiss += 1;
}
cache->numRefreshes += 1;
}
ACTOR static Future<Void> refreshImpl(TenantEntryCache<T>* cache, TenantEntryCacheRefreshReason reason) {
TraceEvent(SevDebug, "TenantEntryCacheRefreshStart", cache->id()).detail("Reason", static_cast<int>(reason));
state Reference<ReadYourWritesTransaction> tr = cache->getDatabase()->createTransaction();
loop {
try {
state TenantNameEntryPairVec tenantList = wait(getTenantList(tr));
// Refresh cache entries reflecting the latest database state
cache->clear();
for (auto& tenant : tenantList) {
cache->put(tenant);
}
updateCacheRefreshMetrics(cache, reason);
break;
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent(SevInfo, "TenantEntryCacheRefreshError", cache->id())
.errorUnsuppressed(e)
.suppressFor(1.0);
}
wait(tr->onError(e));
}
}
TraceEvent(SevDebug, "TenantEntryCacheRefreshEnd", cache->id()).detail("Reason", static_cast<int>(reason));
return Void();
}
ACTOR static Future<Optional<TenantEntryCachePayload<T>>> getByIdImpl(TenantEntryCache<T>* cache,
int64_t tenantId) {
Optional<TenantEntryCachePayload<T>> ret = cache->lookupById(tenantId);
if (ret.present()) {
cache->hits += 1;
return ret;
}
TraceEvent(SevInfo, "TenantEntryCacheGetByIdRefresh").detail("TenantId", tenantId);
// Entry not found. Refresh cacheEntries by scanning underlying KeyRange.
// TODO: Cache will implement a "KeyRange" watch, monitoring notification when a new entry gets added or any
// existing entry gets updated within the KeyRange of interest. Hence, misses would be very rare
wait(refreshImpl(cache, TenantEntryCacheRefreshReason::CACHE_MISS));
cache->misses += 1;
return cache->lookupById(tenantId);
}
ACTOR static Future<Optional<TenantEntryCachePayload<T>>> getByNameImpl(TenantEntryCache<T>* cache,
TenantName name) {
Optional<TenantEntryCachePayload<T>> ret = cache->lookupByName(name);
if (ret.present()) {
cache->hits += 1;
return ret;
}
TraceEvent("TenantEntryCacheGetByNameRefresh").detail("TenantName", name);
// Entry not found. Refresh cacheEntries by scanning underlying KeyRange.
// TODO: Cache will implement a "KeyRange" watch, monitoring notification when a new entry gets added or any
// existing entry gets updated within the KeyRange of interest. Hence, misses would be very rare
wait(refreshImpl(cache, TenantEntryCacheRefreshReason::CACHE_MISS));
cache->misses += 1;
return cache->lookupByName(name);
}
Optional<TenantEntryCachePayload<T>> lookupById(int64_t tenantId) {
Optional<TenantEntryCachePayload<T>> ret;
auto itr = mapByTenantId.find(tenantId);
if (itr == mapByTenantId.end()) {
return ret;
}
return itr->value;
}
Optional<TenantEntryCachePayload<T>> lookupByName(TenantName name) {
Optional<TenantEntryCachePayload<T>> ret;
auto itr = mapByTenantName.find(name);
if (itr == mapByTenantName.end()) {
return ret;
}
return itr->value;
}
Future<Void> refresh(TenantEntryCacheRefreshReason reason) { return refreshImpl(this, reason); }
static TenantEntryCachePayload<Void> defaultCreatePayload(const TenantName& name, const TenantMapEntry& entry) {
TenantEntryCachePayload<Void> payload;
payload.name = name;
payload.entry = entry;
return payload;
}
Future<Void> removeEntryInt(Optional<int64_t> tenantId,
Optional<KeyRef> tenantPrefix,
Optional<TenantName> tenantName,
bool refreshCache) {
typename Map<int64_t, TenantEntryCachePayload<T>>::iterator itrId;
typename Map<TenantName, TenantEntryCachePayload<T>>::iterator itrName;
if (tenantId.present() || tenantPrefix.present()) {
// Ensure either tenantId OR tenantPrefix is valid (but not both)
ASSERT(tenantId.present() != tenantPrefix.present());
ASSERT(!tenantName.present());
int64_t tId = tenantId.present() ? tenantId.get() : TenantMapEntry::prefixToId(tenantPrefix.get());
TraceEvent("TenantEntryCacheRemoveEntry").detail("Id", tId);
itrId = mapByTenantId.find(tId);
if (itrId == mapByTenantId.end()) {
return Void();
}
// Ensure byId and byName cache are in-sync
itrName = mapByTenantName.find(itrId->value.name);
ASSERT(itrName != mapByTenantName.end());
} else if (tenantName.present()) {
ASSERT(!tenantId.present() && !tenantPrefix.present());
TraceEvent("TenantEntryCacheRemoveEntry").detail("Name", tenantName.get());
itrName = mapByTenantName.find(tenantName.get());
if (itrName == mapByTenantName.end()) {
return Void();
}
// Ensure byId and byName cache are in-sync
itrId = mapByTenantId.find(itrName->value.entry.id);
ASSERT(itrId != mapByTenantId.end());
} else {
// Invalid input, one of: tenantId, tenantPrefix or tenantName needs to be valid.
throw operation_failed();
}
ASSERT(itrId != mapByTenantId.end() && itrName != mapByTenantName.end());
TraceEvent("TenantEntryCacheRemoveEntry")
.detail("Id", itrId->key)
.detail("Prefix", itrId->value.entry.prefix)
.detail("Name", itrName->key);
mapByTenantId.erase(itrId);
mapByTenantName.erase(itrName);
if (refreshCache) {
return refreshImpl(this, TenantEntryCacheRefreshReason::REMOVE_ENTRY);
}
return Void();
}
public:
TenantEntryCache(Database db)
: uid(deterministicRandom()->randomUniqueID()), db(db), createPayloadFunc(defaultCreatePayload),
refreshMode(TenantEntryCacheRefreshMode::PERIODIC_TASK), metrics("TenantEntryCacheMetrics", uid.toString()),
hits("TenantEntryCacheHits", metrics), misses("TenantEntryCacheMisses", metrics),
refreshByCacheInit("TenantEntryCacheRefreshInit", metrics),
refreshByCacheMiss("TenantEntryCacheRefreshMiss", metrics),
numRefreshes("TenantEntryCacheNumRefreshes", metrics) {
TraceEvent("TenantEntryCacheCreatedDefaultFunc", uid);
}
TenantEntryCache(Database db, TenantEntryCachePayloadFunc<T> fn)
: uid(deterministicRandom()->randomUniqueID()), db(db), createPayloadFunc(fn),
refreshMode(TenantEntryCacheRefreshMode::PERIODIC_TASK), metrics("TenantEntryCacheMetrics", uid.toString()),
hits("TenantEntryCacheHits", metrics), misses("TenantEntryCacheMisses", metrics),
refreshByCacheInit("TenantEntryCacheRefreshInit", metrics),
refreshByCacheMiss("TenantEntryCacheRefreshMiss", metrics),
numRefreshes("TenantEntryCacheNumRefreshes", metrics) {
TraceEvent("TenantEntryCacheCreated", uid);
}
TenantEntryCache(Database db, UID id, TenantEntryCachePayloadFunc<T> fn)
: uid(id), db(db), createPayloadFunc(fn), refreshMode(TenantEntryCacheRefreshMode::PERIODIC_TASK),
metrics("TenantEntryCacheMetrics", uid.toString()), hits("TenantEntryCacheHits", metrics),
misses("TenantEntryCacheMisses", metrics), refreshByCacheInit("TenantEntryCacheRefreshInit", metrics),
refreshByCacheMiss("TenantEntryCacheRefreshMiss", metrics),
numRefreshes("TenantEntryCacheNumRefreshes", metrics) {
TraceEvent("TenantEntryCacheCreated", uid);
}
TenantEntryCache(Database db, UID id, TenantEntryCachePayloadFunc<T> fn, TenantEntryCacheRefreshMode mode)
: uid(id), db(db), createPayloadFunc(fn), refreshMode(mode), metrics("TenantEntryCacheMetrics", uid.toString()),
hits("TenantEntryCacheHits", metrics), misses("TenantEntryCacheMisses", metrics),
refreshByCacheInit("TenantEntryCacheRefreshInit", metrics),
refreshByCacheMiss("TenantEntryCacheRefreshMiss", metrics),
numRefreshes("TenantEntryCacheNumRefreshes", metrics) {
TraceEvent("TenantEntryCacheCreated", uid);
}
Future<Void> init() {
TraceEvent("TenantEntryCacheInit", uid);
Future<Void> f = refreshImpl(this, TenantEntryCacheRefreshReason::INIT);
// Launch reaper task to periodically refresh cache by scanning database KeyRange
TenantEntryCacheRefreshReason reason = TenantEntryCacheRefreshReason::PERIODIC_TASK;
if (refreshMode == TenantEntryCacheRefreshMode::PERIODIC_TASK) {
refresher = recurringAsync([&, reason]() { return refresh(reason); },
SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL, /* interval */
true, /* absoluteIntervalDelay */
SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL, /* intialDelay */
TaskPriority::Worker);
}
return f;
}
Database getDatabase() const { return db; }
UID id() const { return uid; }
void clear() {
mapByTenantId.clear();
mapByTenantName.clear();
}
Future<Void> removeEntryById(int64_t tenantId, bool refreshCache = false) {
return removeEntryInt(tenantId, Optional<KeyRef>(), Optional<TenantName>(), refreshCache);
}
Future<Void> removeEntryByPrefix(KeyRef tenantPrefix, bool refreshCache = false) {
return removeEntryInt(Optional<int64_t>(), tenantPrefix, Optional<TenantName>(), refreshCache);
}
Future<Void> removeEntryByName(TenantName tenantName, bool refreshCache = false) {
return removeEntryInt(Optional<int64_t>(), Optional<KeyRef>(), tenantName, refreshCache);
}
void put(const TenantNameEntryPair& pair) {
TenantEntryCachePayload<T> payload = createPayloadFunc(pair.first, pair.second);
auto idItr = mapByTenantId.find(pair.second.id);
auto nameItr = mapByTenantName.find(pair.first);
Optional<TenantName> existingName;
Optional<int64_t> existingId;
if (nameItr != mapByTenantName.end()) {
existingId = nameItr->value.entry.id;
mapByTenantId.erase(nameItr->value.entry.id);
}
if (idItr != mapByTenantId.end()) {
existingName = idItr->value.name;
mapByTenantName.erase(idItr->value.name);
}
mapByTenantId[pair.second.id] = payload;
mapByTenantName[pair.first] = payload;
TraceEvent("TenantEntryCachePut")
.detail("TenantName", pair.first)
.detail("TenantNameExisting", existingName)
.detail("TenantID", pair.second.id)
.detail("TenantIDExisting", existingId)
.detail("TenantPrefix", pair.second.prefix);
CODE_PROBE(idItr == mapByTenantId.end() && nameItr == mapByTenantName.end(), "TenantCache new entry");
CODE_PROBE(idItr != mapByTenantId.end() && nameItr == mapByTenantName.end(), "TenantCache entry name updated");
CODE_PROBE(idItr == mapByTenantId.end() && nameItr != mapByTenantName.end(), "TenantCache entry id updated");
CODE_PROBE(idItr != mapByTenantId.end() && nameItr != mapByTenantName.end(),
"TenantCache entry id and name updated");
}
Future<Optional<TenantEntryCachePayload<T>>> getById(int64_t tenantId) { return getByIdImpl(this, tenantId); }
Future<Optional<TenantEntryCachePayload<T>>> getByPrefix(KeyRef prefix) {
int64_t id = TenantMapEntry::prefixToId(prefix);
return getByIdImpl(this, id);
}
Future<Optional<TenantEntryCachePayload<T>>> getByName(TenantName name) { return getByNameImpl(this, name); }
// Counter access APIs
Counter::Value numCacheRefreshes() const { return numRefreshes.getValue(); }
Counter::Value numRefreshByMisses() const { return refreshByCacheMiss.getValue(); }
Counter::Value numRefreshByInit() const { return refreshByCacheInit.getValue(); }
};
#include "flow/unactorcompiler.h"
#endif // FDBSERVER_TENANTENTRYCACHE_ACTOR_H

View File

@ -0,0 +1,312 @@
/*
* TenantEntryCacheWorkload.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/TenantManagement.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/TenantEntryCache.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
TenantEntryCachePayload<int64_t> createPayload(const TenantName& name, const TenantMapEntry& entry) {
TenantEntryCachePayload<int64_t> payload;
payload.name = name;
payload.entry = entry;
payload.payload = entry.id;
return payload;
}
} // namespace
struct TenantEntryCacheWorkload : TestWorkload {
const TenantName tenantNamePrefix = "tenant_entry_cache_workload_"_sr;
TenantName localTenantNamePrefix;
int maxTenants;
int clientId;
TenantEntryCacheWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
clientId = wcx.clientId;
maxTenants = std::max(3, std::min<int>(1e8 - 1, getOption(options, "maxTenants"_sr, 1000)));
localTenantNamePrefix = format("%stenant_%d_", tenantNamePrefix.toString().c_str(), clientId);
}
~TenantEntryCacheWorkload() {}
static void compareTenants(Optional<TenantEntryCachePayload<int64_t>> left, TenantMapEntry& right) {
ASSERT(left.present());
ASSERT_EQ(left.get().entry.id, right.id);
ASSERT_EQ(left.get().entry.prefix.compare(right.prefix), 0);
ASSERT_EQ(left.get().payload, right.id);
}
ACTOR static Future<Void> compareContents(std::vector<std::pair<TenantName, TenantMapEntry>>* tenants,
Reference<TenantEntryCache<int64_t>> cache) {
state int i;
for (i = 0; i < tenants->size(); i++) {
if (deterministicRandom()->coinflip()) {
Optional<TenantEntryCachePayload<int64_t>> e = wait(cache->getById(tenants->at(i).second.id));
compareTenants(e, tenants->at(i).second);
} else {
Optional<TenantEntryCachePayload<int64_t>> e = wait(cache->getByName(tenants->at(i).first));
compareTenants(e, tenants->at(i).second);
}
}
return Void();
}
ACTOR static Future<Void> testTenantNotFound(Database cx) {
state Reference<TenantEntryCache<int64_t>> cache = makeReference<TenantEntryCache<int64_t>>(cx, createPayload);
TraceEvent("TenantNotFoundStart");
wait(cache->init());
// Ensure associated counter values gets updated
ASSERT_EQ(cache->numRefreshByInit(), 1);
state TenantMapEntry dummy(std::numeric_limits<int64_t>::max(), TenantState::READY, true /* encrypted */);
Optional<TenantEntryCachePayload<int64_t>> value = wait(cache->getById(dummy.id));
ASSERT(!value.present());
Optional<TenantEntryCachePayload<int64_t>> value1 = wait(cache->getByPrefix(dummy.prefix));
ASSERT(!value1.present());
// Ensure associated counter values gets updated
ASSERT_EQ(cache->numRefreshByMisses(), 2);
TraceEvent("TenantNotFoundEnd");
return Void();
}
ACTOR static Future<Void> testCreateTenantsAndLookup(
Database cx,
TenantEntryCacheWorkload* self,
std::vector<std::pair<TenantName, TenantMapEntry>>* tenantList) {
state Reference<TenantEntryCache<int64_t>> cache = makeReference<TenantEntryCache<int64_t>>(cx, createPayload);
state int nTenants = deterministicRandom()->randomInt(5, self->maxTenants);
TraceEvent("CreateTenantsAndLookupStart");
wait(cache->init());
// Ensure associated counter values gets updated
ASSERT_EQ(cache->numRefreshByInit(), 1);
ASSERT_GE(cache->numCacheRefreshes(), 1);
tenantList->clear();
state int i = 0;
state std::unordered_set<TenantName> tenantNames;
while (i < nTenants) {
state TenantName name(format("%s%08d",
self->localTenantNamePrefix.toString().c_str(),
deterministicRandom()->randomInt(0, self->maxTenants)));
if (tenantNames.find(name) != tenantNames.end()) {
continue;
}
Optional<TenantMapEntry> entry = wait(TenantAPI::createTenant(cx.getReference(), StringRef(name)));
ASSERT(entry.present());
tenantList->emplace_back(std::make_pair(name, entry.get()));
tenantNames.emplace(name);
i++;
}
wait(compareContents(tenantList, cache));
TraceEvent("CreateTenantsAndLookupEnd");
return Void();
}
ACTOR static Future<Void> testTenantInsert(Database cx,
TenantEntryCacheWorkload* self,
std::vector<std::pair<TenantName, TenantMapEntry>>* tenantList) {
state Reference<TenantEntryCache<int64_t>> cache = makeReference<TenantEntryCache<int64_t>>(cx, createPayload);
ASSERT(!tenantList->empty() && tenantList->size() >= 2);
TraceEvent("TestTenantInsertStart");
wait(cache->init());
// Ensure associated counter values gets updated
ASSERT_EQ(cache->numRefreshByInit(), 1);
ASSERT_GE(cache->numCacheRefreshes(), 1);
state std::pair<TenantName, TenantMapEntry> p = tenantList->at(0);
state Optional<TenantEntryCachePayload<int64_t>> entry;
// Tenant rename
p.first = TenantName(format("%s%08d",
self->localTenantNamePrefix.toString().c_str(),
deterministicRandom()->randomInt(self->maxTenants + 100, self->maxTenants + 200)));
cache->put(p);
Optional<TenantEntryCachePayload<int64_t>> e = wait(cache->getByName(p.first));
entry = e;
compareTenants(entry, p.second);
// Tenant delete & recreate
p.second.id = p.second.id + deterministicRandom()->randomInt(self->maxTenants + 500, self->maxTenants + 700);
cache->put(p);
Optional<TenantEntryCachePayload<int64_t>> e1 = wait(cache->getById(p.second.id));
entry = e1;
compareTenants(entry, p.second);
ASSERT_EQ(p.first.contents().toString().compare(entry.get().name.contents().toString()), 0);
// Delete a tenant and rename an existing TenantEntry to reuse the name of deleted tenant
state std::pair<TenantName, TenantMapEntry> p1 = tenantList->back();
tenantList->pop_back();
wait(TenantAPI::deleteTenant(cx.getReference(), p1.first));
cache->put(std::make_pair(p1.first, p.second));
Optional<TenantEntryCachePayload<int64_t>> e2 = wait(cache->getById(p.second.id));
entry = e2;
compareTenants(entry, p.second);
ASSERT_EQ(p1.first.contents().toString().compare(entry.get().name.contents().toString()), 0);
TraceEvent("TestTenantInsertEnd");
return Void();
}
ACTOR static Future<Void> testCacheReload(Database cx,
std::vector<std::pair<TenantName, TenantMapEntry>>* tenantList) {
state Reference<TenantEntryCache<int64_t>> cache = makeReference<TenantEntryCache<int64_t>>(cx, createPayload);
ASSERT(!tenantList->empty());
TraceEvent("CacheReloadStart");
wait(cache->init());
// Ensure associated counter values gets updated
ASSERT_EQ(cache->numRefreshByInit(), 1);
ASSERT_GE(cache->numCacheRefreshes(), 1);
wait(compareContents(tenantList, cache));
TraceEvent("CacheReloadEnd");
return Void();
}
ACTOR static Future<Void> testTenantCacheDefaultFunc(Database cx) {
wait(delay(0.0));
return Void();
}
ACTOR static Future<Void> testCacheRefresh(Database cx) {
state Reference<TenantEntryCache<int64_t>> cache = makeReference<TenantEntryCache<int64_t>>(cx, createPayload);
TraceEvent("TestCacheRefreshStart");
wait(cache->init());
// Ensure associated counter values gets updated
ASSERT_EQ(cache->numRefreshByInit(), 1);
ASSERT_GE(cache->numCacheRefreshes(), 1);
int refreshWait =
SERVER_KNOBS->TENANT_CACHE_LIST_REFRESH_INTERVAL * 10; // initial delay + multiple refresh runs
wait(delay(refreshWait));
// InitRefresh + multiple timer based invocations
ASSERT_GE(cache->numCacheRefreshes(), 3);
TraceEvent("TestCacheRefreshEnd");
return Void();
}
ACTOR static Future<Void> tenantEntryRemove(Database cx,
std::vector<std::pair<TenantName, TenantMapEntry>>* tenantList) {
state Reference<TenantEntryCache<int64_t>> cache = makeReference<TenantEntryCache<int64_t>>(
cx, deterministicRandom()->randomUniqueID(), createPayload, TenantEntryCacheRefreshMode::NONE);
wait(cache->init());
ASSERT(!tenantList->empty());
// Remove an entry from the cache
state int idx = deterministicRandom()->randomInt(0, tenantList->size());
Optional<TenantEntryCachePayload<int64_t>> entry = wait(cache->getByName(tenantList->at(idx).first));
ASSERT(entry.present());
TraceEvent("TestTenantEntryRemoveStart")
.detail("Id", tenantList->at(idx).second.id)
.detail("Name", tenantList->at(idx).first)
.detail("Prefix", tenantList->at(idx).second.prefix);
wait(TenantAPI::deleteTenant(cx.getReference(), tenantList->at(idx).first));
if (deterministicRandom()->coinflip()) {
wait(cache->removeEntryById(tenantList->at(idx).second.id));
} else if (deterministicRandom()->coinflip()) {
wait(cache->removeEntryByPrefix(tenantList->at(idx).second.prefix));
} else {
wait(cache->removeEntryByName(tenantList->at(idx).first));
}
state Optional<TenantEntryCachePayload<int64_t>> e = wait(cache->getById(tenantList->at(idx).second.id));
ASSERT(!e.present());
state Optional<TenantEntryCachePayload<int64_t>> e1 =
wait(cache->getByPrefix(tenantList->at(idx).second.prefix));
ASSERT(!e1.present());
state Optional<TenantEntryCachePayload<int64_t>> e2 = wait(cache->getByName(tenantList->at(idx).first));
ASSERT(!e2.present());
// Ensure remove-entry is an idempotent operation
cache->removeEntryByName(tenantList->at(idx).first);
Optional<TenantEntryCachePayload<int64_t>> e3 = wait(cache->getById(tenantList->at(idx).second.id));
ASSERT(!e3.present());
return Void();
}
Future<Void> setup(Database const& cx) override {
if (clientId == 0 && g_network->isSimulated() && BUGGIFY) {
IKnobCollection::getMutableGlobalKnobCollection().setKnob("tenant_cache_list_refresh_interval",
KnobValueRef::create(int{ 2 }));
}
return Void();
}
Future<Void> start(Database const& cx) override {
if (clientId == 0) {
return _start(cx, this);
}
return Void();
}
ACTOR Future<Void> _start(Database cx, TenantEntryCacheWorkload* self) {
state std::vector<std::pair<TenantName, TenantMapEntry>> tenantList;
wait(testTenantNotFound(cx));
wait(testCreateTenantsAndLookup(cx, self, &tenantList));
wait(testTenantInsert(cx, self, &tenantList));
wait(tenantEntryRemove(cx, &tenantList));
wait(testTenantCacheDefaultFunc(cx));
wait(testCacheRefresh(cx));
return Void();
}
std::string description() const override { return "TenantEntryCache"; }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<TenantEntryCacheWorkload> TenantEntryCacheWorkloadFactory("TenantEntryCache");

View File

@ -61,7 +61,9 @@ std::string getEncryptDbgTraceKeyWithTS(std::string_view prefix,
int64_t expAfterTS) {
// Construct the TraceEvent field key ensuring its uniqueness and compliance to TraceEvent field validator and log
// parsing tools
std::string dName = domainName.toString();
// Underscores are invalid in trace event detail name.
boost::replace_all(dName, "_", "-");
boost::format fmter("%s.%lld.%s.%llu.%lld.%lld");
return boost::str(
boost::format(fmter % prefix % domainId % domainName.toString() % baseCipherId % refAfterTS % expAfterTS));
return boost::str(boost::format(fmter % prefix % domainId % dName % baseCipherId % refAfterTS % expAfterTS));
}

View File

@ -224,14 +224,6 @@ Future<T> delayed(Future<T> what, double time = 0.0, TaskPriority taskID = TaskP
}
}
// wait <interval> then call what() in a loop forever
ACTOR template <class Func>
Future<Void> recurring(Func what, double interval, TaskPriority taskID = TaskPriority::DefaultDelay) {
loop choose {
when(wait(delay(interval, taskID))) { what(); }
}
}
ACTOR template <class Func>
Future<Void> trigger(Func what, Future<Void> signal) {
wait(signal);
@ -1204,6 +1196,48 @@ inline Future<Void> operator||(Future<Void> const& lhs, Future<Void> const& rhs)
return chooseActor(lhs, rhs);
}
// wait <interval> then call what() in a loop forever
ACTOR template <class Func>
Future<Void> recurring(Func what, double interval, TaskPriority taskID = TaskPriority::DefaultDelay) {
loop choose {
when(wait(delay(interval, taskID))) { what(); }
}
}
// Invoke actorFunc() forever in a loop
// At least wait<interval> between two actor functor invocations
ACTOR template <class F>
Future<Void> recurringAsync(
F actorFunc, // Callback actor functor
double interval, // Interval between two subsequent invocations of actor functor.
bool absoluteIntervalDelay, // Flag guarantees "interval" delay between two subequent actor functor invocations. If
// not selected, guarantees provided are "at least 'interval' delay" between two
// subsequent actor functor invocations, however, due to either 'poorly choose' interval
// value AND/OR actor functor taking longer than expected to return, could cause actor
// functor to run with no-delay
double initialDelay, // Initial delay interval
TaskPriority taskID = TaskPriority::DefaultDelay) {
wait(delay(initialDelay));
state Future<Void> val;
loop {
val = actorFunc();
if (absoluteIntervalDelay) {
wait(val);
// Ensure subsequent actorFunc executions observe client supplied delay interval.
wait(delay(interval));
} else {
// Guarantee at-least client supplied interval delay; two possible scenarios:
// 1. The actorFunc executions finishes before 'interval' delay
// 2. The actorFunc executions takes > 'interval' delay.
wait(val && delay(interval));
}
}
}
ACTOR template <class T>
Future<T> brokenPromiseToNever(Future<T> in) {
try {

View File

@ -184,6 +184,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/SystemRebootTestCycle.toml)
add_fdb_test(TEST_FILES fast/TaskBucketCorrectness.toml)
add_fdb_test(TEST_FILES fast/TenantCycle.toml)
add_fdb_test(TEST_FILES fast/TenantEntryCache.toml)
add_fdb_test(TEST_FILES fast/TimeKeeperCorrectness.toml)
add_fdb_test(TEST_FILES fast/TxnStateStoreCycleTest.toml)
add_fdb_test(TEST_FILES fast/UDP.toml)

View File

@ -0,0 +1,10 @@
[configuration]
allowDefaultTenant = false
allowDisablingTenants = false
[[test]]
testTitle = 'TenantEntryCacheTest'
[[test.workload]]
testName = 'TenantEntryCache'
maxTenants = 100