More change feed fetching improvements and optimizations

This commit is contained in:
Josh Slocum 2022-02-07 11:06:23 -06:00
parent c48ca9430d
commit 88cab7fb67
6 changed files with 42 additions and 24 deletions

View File

@ -78,6 +78,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( CHANGE_FEED_LOCATION_LIMIT, 10000 );
init( CHANGE_FEED_CACHE_SIZE, 100000 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_SIZE = 1;
init( CHANGE_FEED_POP_TIMEOUT, 5.0 );
init( CHANGE_FEED_STREAM_MIN_BYTES, 1e4 ); if( randomize && BUGGIFY ) CHANGE_FEED_STREAM_MIN_BYTES = 1;
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;

View File

@ -77,6 +77,7 @@ public:
int64_t CHANGE_FEED_LOCATION_LIMIT;
int64_t CHANGE_FEED_CACHE_SIZE;
double CHANGE_FEED_POP_TIMEOUT;
int64_t CHANGE_FEED_STREAM_MIN_BYTES;
int MAX_BATCH_SIZE;
double GRV_BATCH_TIMEOUT;

View File

@ -7721,23 +7721,13 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
req.canReadPopped = canReadPopped;
// divide total buffer size among sub-streams, but keep individual streams large enough to be efficient
req.replyBufferSize = replyBufferSize / interfs.size();
if (replyBufferSize != -1 && req.replyBufferSize < CLIENT_KNOBS->REPLY_BYTE_LIMIT) {
req.replyBufferSize = CLIENT_KNOBS->REPLY_BYTE_LIMIT;
if (replyBufferSize != -1 && req.replyBufferSize < CLIENT_KNOBS->CHANGE_FEED_STREAM_MIN_BYTES) {
req.replyBufferSize = CLIENT_KNOBS->CHANGE_FEED_STREAM_MIN_BYTES;
}
UID debugID = deterministicRandom()->randomUniqueID();
debugIDs.push_back(debugID);
req.debugID = debugID;
results->streams.push_back(it.first.changeFeedStream.getReplyStream(req));
if (debugID.toString().substr(0, 8) == "637bfa4e") {
printf(
"Good example: %s: %p\n", debugID.toString().substr(0, 8).c_str(), results->streams.back().debugAddr());
printf("\n");
}
if (debugID.toString().substr(0, 8) == "1ad27675") {
printf(
"Bad example: %s: %p\n", debugID.toString().substr(0, 8).c_str(), results->streams.back().debugAddr());
printf("\n");
}
}
for (auto& it : results->storageData) {

View File

@ -619,6 +619,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( FETCH_KEYS_PARALLELISM_BYTES, 4e6 ); if( randomize && BUGGIFY ) FETCH_KEYS_PARALLELISM_BYTES = 3e6;
init( FETCH_KEYS_PARALLELISM, 2 );
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
init( FETCH_CHANGEFEED_PARALLELISM, 2 );
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;
init( STORAGE_FETCH_BYTES, 2500000 ); if( randomize && BUGGIFY ) STORAGE_FETCH_BYTES = 500000;

View File

@ -560,6 +560,7 @@ public:
int FETCH_KEYS_PARALLELISM_BYTES;
int FETCH_KEYS_PARALLELISM;
int FETCH_KEYS_LOWER_PRIORITY;
int FETCH_CHANGEFEED_PARALLELISM;
int BUGGIFY_BLOCK_BYTES;
double STORAGE_DURABILITY_LAG_REJECT_THRESHOLD;
double STORAGE_DURABILITY_LAG_MIN_RATE;

View File

@ -702,6 +702,7 @@ public:
FlowLock durableVersionLock;
FlowLock fetchKeysParallelismLock;
FlowLock fetchChangeFeedParallelismLock;
int64_t fetchKeysBytesBudget;
AsyncVar<bool> fetchKeysBudgetUsed;
std::vector<Promise<FetchInjectionInfo*>> readyFetchKeys;
@ -808,7 +809,7 @@ public:
CounterCollection cc;
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getRangeAndFlatMapQueries,
getRangeStreamQueries, finishedQueries, lowPriorityQueries, rowsQueried, bytesQueried, watchQueries,
emptyQueries;
emptyQueries, feedRowsQueried, feedBytesQueried;
// Bytes of the mutations that have been added to the memory of the storage server. When the data is durable
// and cleared from the memory, we do not subtract it but add it to bytesDurable.
@ -824,6 +825,9 @@ public:
// and the lengths of both parameters.
Counter mutationBytes;
// Bytes fetched by fetchChangeFeed for data movements.
Counter feedBytesFetched;
Counter sampledBytesCleared;
// The number of key-value pairs fetched by fetchKeys()
Counter kvFetched;
@ -850,17 +854,19 @@ public:
getRangeStreamQueries("GetRangeStreamQueries", cc), finishedQueries("FinishedQueries", cc),
lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc),
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
feedRowsQueried("FeedRowsQueried", cc), feedBytesQueried("FeedBytesQueried", cc),
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc),
mutationBytes("MutationBytes", cc), sampledBytesCleared("SampledBytesCleared", cc),
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc),
fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc),
fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc),
readsRejected("ReadsRejected", cc), wrongShardServer("WrongShardServer", cc),
fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc),
quickGetValueHit("QuickGetValueHit", cc), quickGetValueMiss("QuickGetValueMiss", cc),
quickGetKeyValuesHit("QuickGetKeyValuesHit", cc), quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc),
mutationBytes("MutationBytes", cc), feedBytesFetched("FeedBytesFetched", cc),
sampledBytesCleared("SampledBytesCleared", cc), kvFetched("KVFetched", cc), mutations("Mutations", cc),
setMutations("SetMutations", cc), clearRangeMutations("ClearRangeMutations", cc),
atomicMutations("AtomicMutations", cc), updateBatches("UpdateBatches", cc),
updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc),
fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc),
fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc),
wrongShardServer("WrongShardServer", cc), fetchedVersions("FetchedVersions", cc),
fetchesFromLogs("FetchesFromLogs", cc), quickGetValueHit("QuickGetValueHit", cc),
quickGetValueMiss("QuickGetValueMiss", cc), quickGetKeyValuesHit("QuickGetKeyValuesHit", cc),
quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc),
readLatencySample("ReadLatencyMetrics",
self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
@ -878,6 +884,11 @@ public:
specialCounter(
cc, "FetchKeysFetchActive", [self]() { return self->fetchKeysParallelismLock.activePermits(); });
specialCounter(cc, "FetchKeysWaiting", [self]() { return self->fetchKeysParallelismLock.waiters(); });
specialCounter(cc, "FetchChangeFeedFetchActive", [self]() {
return self->fetchChangeFeedParallelismLock.activePermits();
});
specialCounter(
cc, "FetchChangeFeedWaiting", [self]() { return self->fetchChangeFeedParallelismLock.waiters(); });
specialCounter(cc, "QueryQueueMax", [self]() { return self->getAndResetMaxQueryQueueSize(); });
specialCounter(cc, "BytesStored", [self]() { return self->metrics.byteSample.getEstimate(allKeys); });
specialCounter(cc, "ActiveWatches", [self]() { return self->numWatches; });
@ -926,6 +937,7 @@ public:
numWatches(0), noRecentUpdates(false), lastUpdate(now()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), updateEagerReads(nullptr),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchChangeFeedParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
@ -2114,6 +2126,9 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
feedReply.atLatestVersion = atLatest;
feedReply.minStreamVersion = minVersion;
data->counters.feedRowsQueried += feedReply.mutations.size();
data->counters.feedBytesQueried += feedReply.mutations.expectedSize();
req.reply.send(feedReply);
if (feedReply.mutations.back().version == req.end - 1) {
req.reply.sendError(end_of_stream());
@ -4135,7 +4150,11 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
Version endVersion,
bool existing) {
state Version startVersion = std::max(beginVersion, emptyVersion + 1);
state Version startVersion = beginVersion;
startVersion = std::max(startVersion, emptyVersion + 1);
startVersion = std::max(startVersion, changeFeedInfo->fetchVersion + 1);
startVersion = std::max(startVersion, changeFeedInfo->durableFetchVersion.get() + 1);
ASSERT(startVersion >= 0);
if (startVersion >= endVersion) {
@ -4239,6 +4258,7 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
localResult = _localResult;
}
}
data->counters.feedBytesFetched += remoteResult.expectedSize();
data->fetchKeysBytesBudget -= remoteResult.expectedSize();
if (data->fetchKeysBytesBudget <= 0) {
data->fetchKeysBudgetUsed.set(true);
@ -4283,6 +4303,10 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
Version endVersion) {
wait(delay(0)); // allow this actor to be cancelled by removals
// bound active change feed fetches
wait(data->fetchChangeFeedParallelismLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holdingFCFPL(data->fetchChangeFeedParallelismLock);
TraceEvent(SevDebug, "FetchChangeFeed", data->thisServerID)
.detail("RangeID", changeFeedInfo->id.printable())
.detail("Range", changeFeedInfo->range.toString())