Including change feed bytes towards storage queue (#8107)

This commit is contained in:
Josh Slocum 2022-09-08 16:37:44 -07:00 committed by GitHub
parent 3887ed5409
commit e66015cbe4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 0 deletions

View File

@ -642,6 +642,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( STORAGE_HARD_LIMIT_VERSION_OVERAGE, VERSIONS_PER_SECOND / 4.0 );
init( STORAGE_DURABILITY_LAG_HARD_MAX, 2000e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_HARD_MAX = 100e6;
init( STORAGE_DURABILITY_LAG_SOFT_MAX, 250e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_SOFT_MAX = 10e6;
init( STORAGE_INCLUDE_FEED_STORAGE_QUEUE, true ); if ( randomize && BUGGIFY ) STORAGE_INCLUDE_FEED_STORAGE_QUEUE = false;
//FIXME: Low priority reads are disabled by assigning very high knob values, reduce knobs for 7.0
init( LOW_PRIORITY_STORAGE_QUEUE_BYTES, 775e8 ); if( smallStorageTarget ) LOW_PRIORITY_STORAGE_QUEUE_BYTES = 1750e3;

View File

@ -573,6 +573,7 @@ public:
int64_t STORAGE_HARD_LIMIT_VERSION_OVERAGE;
int64_t STORAGE_DURABILITY_LAG_HARD_MAX;
int64_t STORAGE_DURABILITY_LAG_SOFT_MAX;
bool STORAGE_INCLUDE_FEED_STORAGE_QUEUE;
int64_t LOW_PRIORITY_STORAGE_QUEUE_BYTES;
int64_t LOW_PRIORITY_DURABILITY_LAG;

View File

@ -940,6 +940,8 @@ public:
std::unordered_map<NetworkAddress, std::map<UID, Version>> changeFeedClientVersions;
std::unordered_map<Key, Version> changeFeedCleanupDurable;
int64_t activeFeedQueries = 0;
int64_t changeFeedMemoryBytes = 0;
std::deque<std::pair<Version, int64_t>> feedMemoryBytesByVersion;
// newestAvailableVersion[k]
// == invalidVersion -> k is unavailable at all versions
@ -1201,6 +1203,7 @@ public:
specialCounter(cc, "KvstoreInlineKey", [self]() { return std::get<2>(self->storage.getSize()); });
specialCounter(cc, "ActiveChangeFeeds", [self]() { return self->uidChangeFeed.size(); });
specialCounter(cc, "ActiveChangeFeedQueries", [self]() { return self->activeFeedQueries; });
specialCounter(cc, "ChangeFeedMemoryBytes", [self]() { return self->changeFeedMemoryBytes; });
}
} counters;
@ -1440,6 +1443,20 @@ public:
return minVersion;
}
// count in-memory change feed bytes towards storage queue size, for the purposes of memory management and
// throttling
void addFeedBytesAtVersion(int64_t bytes, Version version) {
if (feedMemoryBytesByVersion.empty() || version != feedMemoryBytesByVersion.back().first) {
ASSERT(feedMemoryBytesByVersion.empty() || version >= feedMemoryBytesByVersion.back().first);
feedMemoryBytesByVersion.push_back({ version, 0 });
}
feedMemoryBytesByVersion.back().second += bytes;
changeFeedMemoryBytes += bytes;
if (SERVER_KNOBS->STORAGE_INCLUDE_FEED_STORAGE_QUEUE) {
counters.bytesInput += bytes;
}
}
void getSplitPoints(SplitRangeRequest const& req) {
try {
Optional<TenantMapEntry> entry = getTenantEntry(version.get(), req.tenantInfo);
@ -5076,6 +5093,17 @@ bool changeDurableVersion(StorageServer* data, Version desiredDurableVersion) {
data->counters.bytesDurable += bytesDurable;
}
int64_t feedBytesDurable = 0;
while (!data->feedMemoryBytesByVersion.empty() &&
data->feedMemoryBytesByVersion.front().first <= desiredDurableVersion) {
feedBytesDurable += data->feedMemoryBytesByVersion.front().second;
data->feedMemoryBytesByVersion.pop_front();
}
data->changeFeedMemoryBytes -= feedBytesDurable;
if (SERVER_KNOBS->STORAGE_INCLUDE_FEED_STORAGE_QUEUE) {
data->counters.bytesDurable += feedBytesDurable;
}
if (EXPENSIVE_VALIDATION) {
// Check that the above loop did its job
auto view = data->data().atLatest();
@ -5261,6 +5289,7 @@ void applyChangeFeedMutation(StorageServer* self, MutationRef const& m, Version
}
it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), m);
self->currentChangeFeeds.insert(it->id);
self->addFeedBytesAtVersion(m.totalSize(), version);
DEBUG_MUTATION("ChangeFeedWriteSet", version, m, self->thisServerID)
.detail("Range", it->range)
@ -5289,6 +5318,8 @@ void applyChangeFeedMutation(StorageServer* self, MutationRef const& m, Version
}
it->mutations.back().mutations.push_back_deep(it->mutations.back().arena(), m);
self->currentChangeFeeds.insert(it->id);
self->addFeedBytesAtVersion(m.totalSize(), version);
DEBUG_MUTATION("ChangeFeedWriteClear", version, m, self->thisServerID)
.detail("Range", it->range)
.detail("ChangeFeedID", it->id);