Merge pull request #8176 from sfc-gh-jslocum/ss_cf_burst_fix_main

Fixing Thundering Herd problem of change feed stream retries in SS
This commit is contained in:
Josh Slocum 2022-09-14 16:01:20 -05:00 committed by GitHub
commit d4ba6c266c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 101 additions and 27 deletions

View File

@ -604,7 +604,8 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
loop {
wait(delay(CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskPriority::FlushTrace));
if (!g_network->isSimulated()) {
bool logTraces = !g_network->isSimulated() || BUGGIFY_WITH_PROB(0.01);
if (logTraces) {
TraceEvent ev("TransactionMetrics", cx->dbId);
ev.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged)
@ -657,6 +658,19 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
cx->bgLatencies.clear();
cx->bgGranulesPerRequest.clear();
if (cx->usedAnyChangeFeeds && logTraces) {
TraceEvent feedEv("ChangeFeedClientMetrics", cx->dbId);
feedEv.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged)
.detail("Cluster",
cx->getConnectionRecord()
? cx->getConnectionRecord()->getConnectionString().clusterKeyName().toString()
: "")
.detail("Internal", cx->internal);
cx->ccFeed.logToTraceEvent(feedEv);
}
lastLogged = now();
}
}
@ -1466,9 +1480,13 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), latencies(1000), readLatencies(1000),
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000),
bgGranulesPerRequest(1000), outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0), cachedReadVersion(0),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), usedAnyChangeFeeds(false),
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000),
outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0), cachedReadVersion(0),
lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
@ -1764,9 +1782,13 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), latencies(1000), readLatencies(1000),
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000),
bgGranulesPerRequest(1000), sharedStatePtr(nullptr), transactionTracingSample(false),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), usedAnyChangeFeeds(false),
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000),
sharedStatePtr(nullptr), transactionTracingSample(false),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
@ -9512,6 +9534,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
bool canReadPopped) {
state Database cx(db);
state Span span("NAPI:GetChangeFeedStream"_loc);
db->usedAnyChangeFeeds = true;
results->endVersion = end;
@ -9587,7 +9610,10 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
loc = 0;
}
++db->feedStreamStarts;
if (locations.size() > 1) {
++db->feedMergeStreamStarts;
std::vector<std::pair<StorageServerInterface, KeyRange>> interfs;
for (int i = 0; i < locations.size(); i++) {
interfs.emplace_back(locations[i].locations->getInterface(chosenLocations[i]),
@ -9610,6 +9636,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
results->streams.clear();
results->storageData.clear();
if (e.code() == error_code_change_feed_popped) {
++db->feedNonRetriableErrors;
CODE_PROBE(true, "getChangeFeedStreamActor got popped");
results->mutations.sendError(e);
results->refresh.sendError(e);
@ -9629,18 +9656,27 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
e.code() == error_code_connection_failed || e.code() == error_code_unknown_change_feed ||
e.code() == error_code_broken_promise || e.code() == error_code_future_version ||
e.code() == error_code_request_maybe_delivered) {
e.code() == error_code_request_maybe_delivered ||
e.code() == error_code_storage_too_many_feed_streams) {
++db->feedErrors;
db->changeFeedCache.erase(rangeID);
cx->invalidateCache(Key(), keys);
if (begin == lastBeginVersion) {
if (begin == lastBeginVersion || e.code() == error_code_storage_too_many_feed_streams) {
// We didn't read anything since the last failure before failing again.
// Do exponential backoff, up to 1 second
sleepWithBackoff = std::min(1.0, sleepWithBackoff * 1.5);
// Back off quickly and exponentially, up to 1 second
sleepWithBackoff = std::min(2.0, sleepWithBackoff * 5);
sleepWithBackoff = std::max(0.1, sleepWithBackoff);
} else {
sleepWithBackoff = CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY;
}
TraceEvent("ChangeFeedClientError")
.errorUnsuppressed(e)
.suppressFor(30.0)
.detail("AnyProgress", begin != lastBeginVersion);
wait(delay(sleepWithBackoff));
} else {
++db->feedNonRetriableErrors;
TraceEvent("ChangeFeedClientErrorNonRetryable").errorUnsuppressed(e).suppressFor(5.0);
results->mutations.sendError(e);
results->refresh.sendError(change_feed_cancelled());
results->streams.clear();
@ -9767,6 +9803,7 @@ Future<OverlappingChangeFeedsInfo> DatabaseContext::getOverlappingChangeFeeds(Ke
}
ACTOR static Future<Void> popChangeFeedBackup(Database cx, Key rangeID, Version version) {
++cx->feedPopsFallback;
state Transaction tr(cx);
loop {
try {
@ -9804,6 +9841,8 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, Ke
state Database cx(db);
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
state Span span("NAPI:PopChangeFeedMutations"_loc);
db->usedAnyChangeFeeds = true;
++db->feedPops;
state KeyRange keys = wait(getChangeFeedRange(db, cx, rangeID));

View File

@ -684,7 +684,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
bool buggifySmallBWLag = randomize && BUGGIFY;
init( TARGET_BW_LAG, 50.0 ); if(buggifySmallBWLag) TARGET_BW_LAG = 10.0;
init( TARGET_BW_LAG_BATCH, 20.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_BATCH = 4.0;
init( TARGET_BW_LAG_BATCH, 30.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_BATCH = 4.0;
init( TARGET_BW_LAG_UPDATE, 9.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_UPDATE = 1.0;
init( MIN_BW_HISTORY, 10 );
init( BW_ESTIMATION_INTERVAL, 10.0 ); if(buggifySmallBWLag) BW_ESTIMATION_INTERVAL = 2.0;
@ -738,6 +738,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( FETCH_KEYS_PARALLELISM_FULL, 6 );
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
init( SERVE_FETCH_CHECKPOINT_PARALLELISM, 4 );
init( CHANGE_FEED_DISK_READS_PARALLELISM, 1000 ); if( randomize && BUGGIFY ) CHANGE_FEED_DISK_READS_PARALLELISM = 20;
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;
@ -776,6 +777,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( MAX_PARALLEL_QUICK_GET_VALUE, 50 ); if ( randomize && BUGGIFY ) MAX_PARALLEL_QUICK_GET_VALUE = deterministicRandom()->randomInt(1, 100);
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
init( STORAGE_FEED_QUERY_HARD_LIMIT, 100000 );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -545,6 +545,16 @@ public:
Counter transactionGrvTimedOutBatches;
Counter transactionCommitVersionNotFoundForSS;
// Change Feed metrics. Omit change feed metrics from logging if not used
bool usedAnyChangeFeeds;
CounterCollection ccFeed;
Counter feedStreamStarts;
Counter feedMergeStreamStarts;
Counter feedErrors;
Counter feedNonRetriableErrors;
Counter feedPops;
Counter feedPopsFallback;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit,
bytesPerCommit, bgLatencies, bgGranulesPerRequest;

View File

@ -693,6 +693,7 @@ public:
int FETCH_KEYS_PARALLELISM_FULL;
int FETCH_KEYS_LOWER_PRIORITY;
int SERVE_FETCH_CHECKPOINT_PARALLELISM;
int CHANGE_FEED_DISK_READS_PARALLELISM;
int BUGGIFY_BLOCK_BYTES;
int64_t STORAGE_RECOVERY_VERSION_LAG_LIMIT;
double STORAGE_DURABILITY_LAG_REJECT_THRESHOLD;
@ -731,6 +732,7 @@ public:
int CHECKPOINT_TRANSFER_BLOCK_BYTES;
int QUICK_GET_KEY_VALUES_LIMIT;
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
int STORAGE_FEED_QUERY_HARD_LIMIT;
// Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;

View File

@ -1016,7 +1016,7 @@ public:
// Extra lock that prevents too much post-initial-fetch work from building up, such as mutation applying and change
// feed tail fetching
FlowLock fetchKeysParallelismFullLock;
FlowLock fetchChangeFeedParallelismLock;
FlowLock changeFeedDiskReadsLock;
int64_t fetchKeysBytesBudget;
AsyncVar<bool> fetchKeysBudgetUsed;
std::vector<Promise<FetchInjectionInfo*>> readyFetchKeys;
@ -1057,7 +1057,8 @@ public:
CounterCollection cc;
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, getMappedRangeQueries,
getRangeStreamQueries, finishedQueries, lowPriorityQueries, rowsQueried, bytesQueried, watchQueries,
emptyQueries, feedRowsQueried, feedBytesQueried, feedStreamQueries, feedVersionQueries;
emptyQueries, feedRowsQueried, feedBytesQueried, feedStreamQueries, rejectedFeedStreamQueries,
feedVersionQueries;
// Bytes of the mutations that have been added to the memory of the storage server. When the data is durable
// and cleared from the memory, we do not subtract it but add it to bytesDurable.
@ -1135,9 +1136,9 @@ public:
lowPriorityQueries("LowPriorityQueries", cc), rowsQueried("RowsQueried", cc),
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
feedRowsQueried("FeedRowsQueried", cc), feedBytesQueried("FeedBytesQueried", cc),
feedStreamQueries("FeedStreamQueries", cc), feedVersionQueries("FeedVersionQueries", cc),
bytesInput("BytesInput", cc), logicalBytesInput("LogicalBytesInput", cc),
logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc),
feedStreamQueries("FeedStreamQueries", cc), rejectedFeedStreamQueries("RejectedFeedStreamQueries", cc),
feedVersionQueries("FeedVersionQueries", cc), bytesInput("BytesInput", cc),
logicalBytesInput("LogicalBytesInput", cc), logicalBytesMoveInOverhead("LogicalBytesMoveInOverhead", cc),
kvCommitLogicalBytes("KVCommitLogicalBytes", cc), kvClearRanges("KVClearRanges", cc),
kvSystemClearRanges("KVSystemClearRanges", cc), bytesDurable("BytesDurable", cc),
bytesFetched("BytesFetched", cc), mutationBytes("MutationBytes", cc),
@ -1195,6 +1196,10 @@ public:
specialCounter(cc, "ServeFetchCheckpointWaiting", [self]() {
return self->serveFetchCheckpointParallelismLock.waiters();
});
specialCounter(
cc, "ChangeFeedDiskReadsActive", [self]() { return self->changeFeedDiskReadsLock.activePermits(); });
specialCounter(
cc, "ChangeFeedDiskReadsWaiting", [self]() { return self->changeFeedDiskReadsLock.waiters(); });
specialCounter(cc, "QueryQueueMax", [self]() { return self->getAndResetMaxQueryQueueSize(); });
specialCounter(cc, "BytesStored", [self]() { return self->metrics.byteSample.getEstimate(allKeys); });
specialCounter(cc, "ActiveWatches", [self]() { return self->numWatches; });
@ -1254,6 +1259,7 @@ public:
numWatches(0), noRecentUpdates(false), lastUpdate(now()), updateEagerReads(nullptr),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM),
fetchKeysParallelismFullLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_FULL),
changeFeedDiskReadsLock(SERVER_KNOBS->CHANGE_FEED_DISK_READS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
@ -2622,12 +2628,16 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
// To let update storage finish
wait(delay(0));
}
wait(data->changeFeedDiskReadsLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holdingDiskReadsLock(data->changeFeedDiskReadsLock);
RangeResult res = wait(
data->storage.readRange(KeyRangeRef(changeFeedDurableKey(req.rangeID, std::max(req.begin, emptyVersion)),
changeFeedDurableKey(req.rangeID, req.end)),
1 << 30,
remainingDurableBytes,
options));
holdingDiskReadsLock.release();
data->counters.kvScanBytes += res.logicalSize();
++data->counters.changeFeedDiskReads;
@ -2964,17 +2974,28 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
state bool atLatest = false;
state bool removeUID = false;
state Optional<Version> blockedVersion;
if (req.replyBufferSize <= 0) {
req.reply.setByteLimit(SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES);
} else {
req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES));
}
++data->counters.feedStreamQueries;
wait(delay(0, TaskPriority::DefaultEndpoint));
try {
++data->counters.feedStreamQueries;
// FIXME: do something more sophisticated here besides hard limit
if (data->activeFeedQueries >= SERVER_KNOBS->STORAGE_FEED_QUERY_HARD_LIMIT ||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.005))) {
req.reply.sendError(storage_too_many_feed_streams());
++data->counters.rejectedFeedStreamQueries;
return Void();
}
data->activeFeedQueries++;
if (req.replyBufferSize <= 0) {
req.reply.setByteLimit(SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES);
} else {
req.reply.setByteLimit(std::min((int64_t)req.replyBufferSize, SERVER_KNOBS->CHANGEFEEDSTREAM_LIMIT_BYTES));
}
wait(delay(0, TaskPriority::DefaultEndpoint));
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "TraceChangeFeedStreamStart", data->thisServerID)
.detail("FeedID", req.rangeID)
@ -2985,7 +3006,6 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
.detail("CanReadPopped", req.canReadPopped)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
}
data->activeFeedQueries++;
wait(success(waitForVersionNoTooOld(data, req.begin)));

View File

@ -100,6 +100,7 @@ ERROR( data_move_dest_team_not_found, 1076, "Dest team was not found for data mo
ERROR( blob_worker_full, 1077, "Blob worker cannot take on more granule assignments" )
ERROR( grv_proxy_memory_limit_exceeded, 1078, "GetReadVersion proxy memory limit exceeded" )
ERROR( blob_granule_request_failed, 1079, "BlobGranule request failed" )
ERROR( storage_too_many_feed_streams, 1080, "Too many feed streams to a single storage server" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )