Properly handle proxy_memory_limit_exceeded error for GetKeyServerLocationsRequest

Also add buggify to inject the error in simulation.
This commit is contained in:
Jingyu Zhou 2023-04-18 15:13:29 -07:00
parent d5473b0cd4
commit b49625d45b
3 changed files with 62 additions and 33 deletions

View File

@ -3117,36 +3117,51 @@ ACTOR Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations_internal(
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before");
state Transaction tr(cx); // Only used for exponential backoff
loop {
++cx->transactionKeyServerLocationRequests;
choose {
when(wait(cx->onProxiesChanged())) {}
when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance(
cx->getCommitProxies(useProvisionalProxies),
&CommitProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(
span.context, tenant, keys.begin, keys.end, limit, reverse, version, keys.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
state GetKeyServerLocationsReply rep = _rep;
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.After");
ASSERT(rep.results.size());
state std::vector<KeyRangeLocationInfo> results;
state int shard = 0;
for (; shard < rep.results.size(); shard++) {
// FIXME: these shards are being inserted into the map sequentially, it would be much more CPU
// efficient to save the map pairs and insert them all at once.
results.emplace_back((toPrefixRelativeRange(rep.results[shard].first, tenant.prefix) & keys),
cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second));
wait(yield());
try {
++cx->transactionKeyServerLocationRequests;
choose {
when(wait(cx->onProxiesChanged())) {
tr.reset();
}
updateTssMappings(cx, rep);
updateTagMappings(cx, rep);
when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance(
cx->getCommitProxies(useProvisionalProxies),
&CommitProxyInterface::getKeyServersLocations,
GetKeyServerLocationsRequest(
span.context, tenant, keys.begin, keys.end, limit, reverse, version, keys.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
state GetKeyServerLocationsReply rep = _rep;
if (debugID.present())
g_traceBatch.addEvent(
"TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.After");
ASSERT(rep.results.size());
return results;
state std::vector<KeyRangeLocationInfo> results;
state int shard = 0;
for (; shard < rep.results.size(); shard++) {
// FIXME: these shards are being inserted into the map sequentially, it would be much more CPU
// efficient to save the map pairs and insert them all at once.
results.emplace_back(
(toPrefixRelativeRange(rep.results[shard].first, tenant.prefix) & keys),
cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second));
wait(yield());
}
updateTssMappings(cx, rep);
updateTagMappings(cx, rep);
return results;
}
}
} catch (Error& e) {
if (e.code() == error_code_commit_proxy_memory_limit_exceeded) {
// Eats commit_proxy_memory_limit_exceeded error from commit proxies
delay(tr.getBackoff(e.code()));
continue;
}
throw;
}
}
}

View File

@ -2728,8 +2728,9 @@ ACTOR static Future<Void> readRequestServer(CommitProxyInterface proxy,
GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture());
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
if (req.limit != CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT && // Always do data distribution requests
commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() >
SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE) {
(commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() >
SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE ||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.001)))) {
++commitData->stats.keyServerLocationErrors;
req.reply.sendError(commit_proxy_memory_limit_exceeded());
TraceEvent(SevWarnAlways, "ProxyLocationRequestThresholdExceeded").suppressFor(60);

View File

@ -7383,6 +7383,23 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
}
}
bool fetchKeyCanRetry(const Error& e) {
switch (e.code()) {
case error_code_end_of_stream:
case error_code_connection_failed:
case error_code_transaction_too_old:
case error_code_future_version:
case error_code_process_behind:
case error_code_server_overloaded:
case error_code_blob_granule_request_failed:
case error_code_blob_granule_transaction_too_old:
case error_code_commit_proxy_memory_limit_exceeded:
return true;
default:
return false;
}
}
ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state const UID fetchKeysID = deterministicRandom()->randomUniqueID();
state TraceInterval interval("FetchKeys");
@ -7624,11 +7641,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
data->fetchKeysBudgetUsed.set(data->fetchKeysBytesBudget <= 0);
}
} catch (Error& e) {
if (e.code() != error_code_end_of_stream && e.code() != error_code_connection_failed &&
e.code() != error_code_transaction_too_old && e.code() != error_code_future_version &&
e.code() != error_code_process_behind && e.code() != error_code_server_overloaded &&
e.code() != error_code_blob_granule_request_failed &&
e.code() != error_code_blob_granule_transaction_too_old) {
if (!fetchKeyCanRetry(e)) {
throw;
}
lastError = e;