Improve the storage quota monitor code + add a knob for refresh interval
This commit is contained in:
parent
a8b1154e45
commit
7647cea4e5
|
@ -297,7 +297,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC, isSimulated ? 2 : 21 * 60 * 60 * 24 ); if(randomize && BUGGIFY) DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC = isSimulated ? 0: 120;
|
||||
init( DD_TENANT_AWARENESS_ENABLED, false );
|
||||
init( TENANT_CACHE_LIST_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_CACHE_LIST_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
|
||||
init( TENANT_CACHE_STORAGE_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
|
||||
init( TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL, 2 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
|
||||
init( TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL, 10 ); if( randomize && BUGGIFY ) TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL = deterministicRandom()->randomInt(1, 10);
|
||||
|
||||
// TeamRemover
|
||||
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
|
||||
|
|
|
@ -237,8 +237,10 @@ public:
|
|||
DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC; // Minimal age of a correct-configured server before it's chosen to be wiggled
|
||||
bool DD_TENANT_AWARENESS_ENABLED;
|
||||
int TENANT_CACHE_LIST_REFRESH_INTERVAL; // How often the TenantCache is refreshed
|
||||
int TENANT_CACHE_STORAGE_REFRESH_INTERVAL; // How often the storage bytes used by each tenant in the TenantCache is
|
||||
// refreshed
|
||||
int TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL; // How often the storage bytes used by each tenant is refreshed
|
||||
// in the TenantCache
|
||||
int TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL; // How often the storage quota allocated to each tenant is
|
||||
// refreshed in the TenantCache
|
||||
|
||||
// TeamRemover to remove redundant teams
|
||||
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor
|
||||
|
|
|
@ -665,7 +665,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
"DDTenantCacheMonitor",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
actors.push_back(reportErrorsExcept(ddTenantCache.get()->monitorstorageQuota(),
|
||||
actors.push_back(reportErrorsExcept(ddTenantCache.get()->monitorStorageQuota(),
|
||||
"StorageQuotaTracker",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
|
|
|
@ -122,7 +122,7 @@ public:
|
|||
ACTOR static Future<Void> monitorStorageUsage(TenantCache* tenantCache) {
|
||||
TraceEvent(SevInfo, "StartingTenantCacheStorageUsageMonitor", tenantCache->id()).log();
|
||||
|
||||
state int refreshInterval = SERVER_KNOBS->TENANT_CACHE_STORAGE_REFRESH_INTERVAL;
|
||||
state int refreshInterval = SERVER_KNOBS->TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL;
|
||||
state double lastTenantListFetchTime = now();
|
||||
|
||||
loop {
|
||||
|
@ -150,21 +150,23 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorstorageQuota(TenantCache* tenantCache) {
|
||||
ACTOR static Future<Void> monitorStorageQuota(TenantCache* tenantCache) {
|
||||
TraceEvent(SevInfo, "StartingTenantCacheStorageQuotaMonitor", tenantCache->id()).log();
|
||||
|
||||
loop {
|
||||
state Transaction tr(tenantCache->dbcx());
|
||||
loop {
|
||||
try {
|
||||
state RangeResult currentQuotas = wait(tr.getRange(storageQuotaKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
TraceEvent("StorageQuota_ReadCurrentQuotas").detail("Size", currentQuotas.size());
|
||||
for (auto const kv : currentQuotas) {
|
||||
Key const key = kv.key.removePrefix(storageQuotaPrefix);
|
||||
TenantName const tenant = kv.key.removePrefix(storageQuotaPrefix);
|
||||
uint64_t const quota = BinaryReader::fromStringRef<uint64_t>(kv.value, Unversioned());
|
||||
tenantCache->storageQuotaInfo.quotaMap[key] = quota;
|
||||
tenantCache->tenantStorageMap[tenant] = quota;
|
||||
}
|
||||
wait(delay(5.0));
|
||||
wait(delay(SERVER_KNOBS->TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TenantCacheGetStorageQuotaError", tenantCache->id()).error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
@ -293,8 +295,8 @@ Future<Void> TenantCache::monitorStorageUsage() {
|
|||
return TenantCacheImpl::monitorStorageUsage(this);
|
||||
}
|
||||
|
||||
Future<Void> TenantCache::monitorstorageQuota() {
|
||||
return TenantCacheImpl::monitorstorageQuota(this);
|
||||
Future<Void> TenantCache::monitorStorageQuota() {
|
||||
return TenantCacheImpl::monitorStorageQuota(this);
|
||||
}
|
||||
|
||||
class TenantCacheUnitTest {
|
||||
|
|
|
@ -32,6 +32,8 @@
|
|||
|
||||
typedef Map<KeyRef, Reference<TCTenantInfo>> TenantMapByPrefix;
|
||||
|
||||
typedef std::unordered_map<TenantName, uint64_t> TenantStorageMap;
|
||||
|
||||
struct TenantCacheTenantCreated {
|
||||
KeyRange keys;
|
||||
Promise<bool> reply;
|
||||
|
@ -51,9 +53,7 @@ private:
|
|||
TenantMapByPrefix tenantCache;
|
||||
|
||||
// Map from tenant names to storage quota
|
||||
struct StorageQuotaInfo {
|
||||
std::map<Key, uint64_t> quotaMap;
|
||||
} storageQuotaInfo;
|
||||
TenantStorageMap tenantStorageMap;
|
||||
|
||||
// mark the start of a new sweep of the tenant cache
|
||||
void startRefresh();
|
||||
|
@ -90,7 +90,7 @@ public:
|
|||
|
||||
Future<Void> monitorStorageUsage();
|
||||
|
||||
Future<Void> monitorstorageQuota();
|
||||
Future<Void> monitorStorageQuota();
|
||||
|
||||
std::string desc() const;
|
||||
|
||||
|
|
Loading…
Reference in New Issue