Move the storage quota monitor from DataDistributor to TenantCache
This commit is contained in:
parent
78e2871a79
commit
a8b1154e45
|
@ -286,8 +286,6 @@ public:
|
|||
PromiseStream<RelocateShard> relocationProducer, relocationConsumer;
|
||||
Reference<PhysicalShardCollection> physicalShardCollection;
|
||||
|
||||
StorageQuotaInfo storageQuotaInfo;
|
||||
|
||||
Promise<Void> initialized;
|
||||
|
||||
std::unordered_map<AuditType, std::vector<std::shared_ptr<DDAudit>>> audits;
|
||||
|
@ -542,27 +540,6 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> storageQuotaTracker(Database cx, StorageQuotaInfo* storageQuotaInfo) {
|
||||
loop {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
state RangeResult currentQuotas = wait(tr.getRange(storageQuotaKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
TraceEvent("StorageQuota_ReadCurrentQuotas").detail("Size", currentQuotas.size());
|
||||
for (auto const kv : currentQuotas) {
|
||||
Key const key = kv.key.removePrefix(storageQuotaPrefix);
|
||||
uint64_t const quota = BinaryReader::fromStringRef<uint64_t>(kv.value, Unversioned());
|
||||
storageQuotaInfo->quotaMap[key] = quota;
|
||||
}
|
||||
wait(delay(5.0));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Periodically check and log the physicalShard status; clean up empty physicalShard;
|
||||
ACTOR Future<Void> monitorPhysicalShardStatus(Reference<PhysicalShardCollection> self) {
|
||||
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
|
||||
|
@ -683,16 +660,15 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
|
||||
actors.push_back(reportErrorsExcept(storageQuotaTracker(cx, &self->storageQuotaInfo),
|
||||
"StorageQuotaTracker",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
|
||||
if (ddIsTenantAware) {
|
||||
actors.push_back(reportErrorsExcept(ddTenantCache.get()->monitorTenantMap(),
|
||||
"DDTenantCacheMonitor",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
actors.push_back(reportErrorsExcept(ddTenantCache.get()->monitorstorageQuota(),
|
||||
"StorageQuotaTracker",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
actors.push_back(reportErrorsExcept(ddTenantCache.get()->monitorStorageUsage(),
|
||||
"StorageUsageTracker",
|
||||
self->ddId,
|
||||
|
|
|
@ -149,6 +149,27 @@ public:
|
|||
wait(delay(refreshInterval));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorstorageQuota(TenantCache* tenantCache) {
|
||||
loop {
|
||||
state Transaction tr(tenantCache->dbcx());
|
||||
loop {
|
||||
try {
|
||||
state RangeResult currentQuotas = wait(tr.getRange(storageQuotaKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
TraceEvent("StorageQuota_ReadCurrentQuotas").detail("Size", currentQuotas.size());
|
||||
for (auto const kv : currentQuotas) {
|
||||
Key const key = kv.key.removePrefix(storageQuotaPrefix);
|
||||
uint64_t const quota = BinaryReader::fromStringRef<uint64_t>(kv.value, Unversioned());
|
||||
tenantCache->storageQuotaInfo.quotaMap[key] = quota;
|
||||
}
|
||||
wait(delay(5.0));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void TenantCache::insert(TenantName& tenantName, TenantMapEntry& tenant) {
|
||||
|
@ -272,6 +293,10 @@ Future<Void> TenantCache::monitorStorageUsage() {
|
|||
return TenantCacheImpl::monitorStorageUsage(this);
|
||||
}
|
||||
|
||||
Future<Void> TenantCache::monitorstorageQuota() {
|
||||
return TenantCacheImpl::monitorstorageQuota(this);
|
||||
}
|
||||
|
||||
class TenantCacheUnitTest {
|
||||
public:
|
||||
ACTOR static Future<Void> InsertAndTestPresence() {
|
||||
|
|
|
@ -484,10 +484,6 @@ ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);
|
|||
// Determines the maximum shard size based on the size of the database
|
||||
int64_t getMaxShardSize(double dbSizeEstimate);
|
||||
|
||||
struct StorageQuotaInfo {
|
||||
std::map<Key, uint64_t> quotaMap;
|
||||
};
|
||||
|
||||
#ifndef __INTEL_COMPILER
|
||||
#pragma endregion
|
||||
#endif
|
||||
|
|
|
@ -50,6 +50,11 @@ private:
|
|||
uint64_t generation;
|
||||
TenantMapByPrefix tenantCache;
|
||||
|
||||
// Map from tenant names to storage quota
|
||||
struct StorageQuotaInfo {
|
||||
std::map<Key, uint64_t> quotaMap;
|
||||
} storageQuotaInfo;
|
||||
|
||||
// mark the start of a new sweep of the tenant cache
|
||||
void startRefresh();
|
||||
|
||||
|
@ -85,6 +90,8 @@ public:
|
|||
|
||||
Future<Void> monitorStorageUsage();
|
||||
|
||||
Future<Void> monitorstorageQuota();
|
||||
|
||||
std::string desc() const;
|
||||
|
||||
bool isTenantKey(KeyRef key) const;
|
||||
|
|
Loading…
Reference in New Issue