Merge commit 'e318fc260070ba6ba604930b8f259c9b655938ea' into keybackedrangemap
# Conflicts: # flow/include/flow/error_definitions.h
This commit is contained in:
commit
7f6d5f296a
|
@ -34,6 +34,7 @@
|
|||
|
||||
#include "boost/algorithm/string.hpp"
|
||||
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "flow/CodeProbe.h"
|
||||
#include "fmt/format.h"
|
||||
|
||||
|
@ -2042,7 +2043,8 @@ void DatabaseContext::clearFailedEndpointOnHealthyServer(const Endpoint& endpoin
|
|||
failedEndpointsOnHealthyServersInfo.erase(endpoint);
|
||||
}
|
||||
|
||||
Future<Void> DatabaseContext::onProxiesChanged() const {
|
||||
Future<Void> DatabaseContext::onProxiesChanged() {
|
||||
backoffDelay = 0.0;
|
||||
return this->proxiesChangeTrigger.onTrigger();
|
||||
}
|
||||
|
||||
|
@ -2994,27 +2996,41 @@ ACTOR Future<KeyRangeLocationInfo> getKeyLocation_internal(Database cx,
|
|||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocation.Before");
|
||||
|
||||
loop {
|
||||
++cx->transactionKeyServerLocationRequests;
|
||||
choose {
|
||||
when(wait(cx->onProxiesChanged())) {}
|
||||
when(GetKeyServerLocationsReply rep = wait(basicLoadBalance(
|
||||
cx->getCommitProxies(useProvisionalProxies),
|
||||
&CommitProxyInterface::getKeyServersLocations,
|
||||
GetKeyServerLocationsRequest(
|
||||
span.context, tenant, key, Optional<KeyRef>(), 100, isBackward, version, key.arena()),
|
||||
TaskPriority::DefaultPromiseEndpoint))) {
|
||||
++cx->transactionKeyServerLocationRequestsCompleted;
|
||||
if (debugID.present())
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocation.After");
|
||||
ASSERT(rep.results.size() == 1);
|
||||
try {
|
||||
wait(cx->getBackoff());
|
||||
++cx->transactionKeyServerLocationRequests;
|
||||
choose {
|
||||
when(wait(cx->onProxiesChanged())) {}
|
||||
when(GetKeyServerLocationsReply rep = wait(basicLoadBalance(
|
||||
cx->getCommitProxies(useProvisionalProxies),
|
||||
&CommitProxyInterface::getKeyServersLocations,
|
||||
GetKeyServerLocationsRequest(
|
||||
span.context, tenant, key, Optional<KeyRef>(), 100, isBackward, version, key.arena()),
|
||||
TaskPriority::DefaultPromiseEndpoint))) {
|
||||
++cx->transactionKeyServerLocationRequestsCompleted;
|
||||
if (debugID.present())
|
||||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocation.After");
|
||||
ASSERT(rep.results.size() == 1);
|
||||
|
||||
auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
|
||||
updateTssMappings(cx, rep);
|
||||
updateTagMappings(cx, rep);
|
||||
auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
|
||||
updateTssMappings(cx, rep);
|
||||
updateTagMappings(cx, rep);
|
||||
|
||||
return KeyRangeLocationInfo(
|
||||
KeyRange(toPrefixRelativeRange(rep.results[0].first, tenant.prefix), rep.arena), locationInfo);
|
||||
cx->updateBackoff(success());
|
||||
return KeyRangeLocationInfo(
|
||||
KeyRange(toPrefixRelativeRange(rep.results[0].first, tenant.prefix), rep.arena), locationInfo);
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_commit_proxy_memory_limit_exceeded) {
|
||||
// Eats commit_proxy_memory_limit_exceeded error from commit proxies
|
||||
TraceEvent(SevWarnAlways, "CommitProxyOverloadedForKeyLocation").suppressFor(5);
|
||||
cx->updateBackoff(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3102,6 +3118,30 @@ Future<KeyRangeLocationInfo> getKeyLocation(Reference<TransactionState> trState,
|
|||
: latestVersion);
|
||||
}
|
||||
|
||||
void DatabaseContext::updateBackoff(const Error& err) {
|
||||
switch (err.code()) {
|
||||
case error_code_success:
|
||||
backoffDelay = backoffDelay / CLIENT_KNOBS->BACKOFF_GROWTH_RATE;
|
||||
if (backoffDelay < CLIENT_KNOBS->DEFAULT_BACKOFF) {
|
||||
backoffDelay = 0.0;
|
||||
}
|
||||
break;
|
||||
|
||||
case error_code_commit_proxy_memory_limit_exceeded:
|
||||
++transactionsResourceConstrained;
|
||||
if (backoffDelay == 0.0) {
|
||||
backoffDelay = CLIENT_KNOBS->DEFAULT_BACKOFF;
|
||||
} else {
|
||||
backoffDelay = std::min(backoffDelay * CLIENT_KNOBS->BACKOFF_GROWTH_RATE,
|
||||
CLIENT_KNOBS->RESOURCE_CONSTRAINED_MAX_BACKOFF);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
ASSERT_WE_THINK(false);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations_internal(
|
||||
Database cx,
|
||||
TenantInfo tenant,
|
||||
|
@ -3117,35 +3157,50 @@ ACTOR Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations_internal(
|
|||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before");
|
||||
|
||||
loop {
|
||||
++cx->transactionKeyServerLocationRequests;
|
||||
choose {
|
||||
when(wait(cx->onProxiesChanged())) {}
|
||||
when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance(
|
||||
cx->getCommitProxies(useProvisionalProxies),
|
||||
&CommitProxyInterface::getKeyServersLocations,
|
||||
GetKeyServerLocationsRequest(
|
||||
span.context, tenant, keys.begin, keys.end, limit, reverse, version, keys.arena()),
|
||||
TaskPriority::DefaultPromiseEndpoint))) {
|
||||
++cx->transactionKeyServerLocationRequestsCompleted;
|
||||
state GetKeyServerLocationsReply rep = _rep;
|
||||
if (debugID.present())
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.After");
|
||||
ASSERT(rep.results.size());
|
||||
try {
|
||||
wait(cx->getBackoff());
|
||||
++cx->transactionKeyServerLocationRequests;
|
||||
choose {
|
||||
when(wait(cx->onProxiesChanged())) {}
|
||||
when(GetKeyServerLocationsReply _rep = wait(basicLoadBalance(
|
||||
cx->getCommitProxies(useProvisionalProxies),
|
||||
&CommitProxyInterface::getKeyServersLocations,
|
||||
GetKeyServerLocationsRequest(
|
||||
span.context, tenant, keys.begin, keys.end, limit, reverse, version, keys.arena()),
|
||||
TaskPriority::DefaultPromiseEndpoint))) {
|
||||
++cx->transactionKeyServerLocationRequestsCompleted;
|
||||
state GetKeyServerLocationsReply rep = _rep;
|
||||
if (debugID.present())
|
||||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.After");
|
||||
ASSERT(rep.results.size());
|
||||
|
||||
state std::vector<KeyRangeLocationInfo> results;
|
||||
state int shard = 0;
|
||||
for (; shard < rep.results.size(); shard++) {
|
||||
// FIXME: these shards are being inserted into the map sequentially, it would be much more CPU
|
||||
// efficient to save the map pairs and insert them all at once.
|
||||
results.emplace_back((toPrefixRelativeRange(rep.results[shard].first, tenant.prefix) & keys),
|
||||
cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second));
|
||||
wait(yield());
|
||||
state std::vector<KeyRangeLocationInfo> results;
|
||||
state int shard = 0;
|
||||
for (; shard < rep.results.size(); shard++) {
|
||||
// FIXME: these shards are being inserted into the map sequentially, it would be much more CPU
|
||||
// efficient to save the map pairs and insert them all at once.
|
||||
results.emplace_back(
|
||||
(toPrefixRelativeRange(rep.results[shard].first, tenant.prefix) & keys),
|
||||
cx->setCachedLocation(rep.results[shard].first, rep.results[shard].second));
|
||||
wait(yield());
|
||||
}
|
||||
updateTssMappings(cx, rep);
|
||||
updateTagMappings(cx, rep);
|
||||
|
||||
cx->updateBackoff(success());
|
||||
return results;
|
||||
}
|
||||
updateTssMappings(cx, rep);
|
||||
updateTagMappings(cx, rep);
|
||||
|
||||
return results;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_commit_proxy_memory_limit_exceeded) {
|
||||
// Eats commit_proxy_memory_limit_exceeded error from commit proxies
|
||||
TraceEvent(SevWarnAlways, "CommitProxyOverloadedForRangeLocation").suppressFor(5);
|
||||
cx->updateBackoff(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3388,16 +3443,30 @@ SpanContext generateSpanID(bool transactionTracingSample, SpanContext parentCont
|
|||
|
||||
ACTOR Future<int64_t> lookupTenantImpl(DatabaseContext* cx, TenantName tenant) {
|
||||
loop {
|
||||
++cx->transactionTenantLookupRequests;
|
||||
choose {
|
||||
when(wait(cx->onProxiesChanged())) {}
|
||||
when(GetTenantIdReply rep = wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False),
|
||||
&CommitProxyInterface::getTenantId,
|
||||
GetTenantIdRequest(tenant, latestVersion),
|
||||
TaskPriority::DefaultPromiseEndpoint))) {
|
||||
++cx->transactionTenantLookupRequestsCompleted;
|
||||
return rep.tenantId;
|
||||
try {
|
||||
wait(cx->getBackoff());
|
||||
|
||||
++cx->transactionTenantLookupRequests;
|
||||
choose {
|
||||
when(wait(cx->onProxiesChanged())) {}
|
||||
when(GetTenantIdReply rep = wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False),
|
||||
&CommitProxyInterface::getTenantId,
|
||||
GetTenantIdRequest(tenant, latestVersion),
|
||||
TaskPriority::DefaultPromiseEndpoint))) {
|
||||
++cx->transactionTenantLookupRequestsCompleted;
|
||||
cx->updateBackoff(success());
|
||||
return rep.tenantId;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_commit_proxy_memory_limit_exceeded) {
|
||||
TraceEvent(SevWarnAlways, "CommitProxyOverloadedForTenant").suppressFor(5);
|
||||
// Eats commit_proxy_memory_limit_exceeded error from commit proxies
|
||||
cx->updateBackoff(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -286,7 +286,7 @@ public:
|
|||
Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(UseProvisionalProxies useProvisionalProxies);
|
||||
Reference<GrvProxyInfo> getGrvProxies(UseProvisionalProxies useProvisionalProxies);
|
||||
bool isCurrentGrvProxy(UID proxyId) const;
|
||||
Future<Void> onProxiesChanged() const;
|
||||
Future<Void> onProxiesChanged();
|
||||
Future<HealthMetrics> getHealthMetrics(bool detailed);
|
||||
// Get storage stats of a storage server from the cached healthy metrics if now() - lastUpdate < maxStaleness.
|
||||
// Otherwise, ask GRVProxy for the up-to-date health metrics.
|
||||
|
@ -763,6 +763,14 @@ public:
|
|||
// { "InitializationError" : <error code> }
|
||||
Standalone<StringRef> getClientStatus();
|
||||
|
||||
// Gets a database level backoff delay future, time in seconds.
|
||||
Future<Void> getBackoff() const { return backoffDelay > 0.0 ? delay(backoffDelay) : Future<Void>(Void()); }
|
||||
|
||||
// Updates internal Backoff state when a request fails or succeeds.
|
||||
// E.g., commit_proxy_memory_limit_exceeded error means the database is overloaded
|
||||
// and the client should back off more significantly than transaction-level errors.
|
||||
void updateBackoff(const Error& err);
|
||||
|
||||
private:
|
||||
using WatchMapKey = std::pair<int64_t, Key>;
|
||||
using WatchMapKeyHasher = boost::hash<WatchMapKey>;
|
||||
|
@ -784,6 +792,7 @@ private:
|
|||
using WatchCounterMap_t = std::unordered_map<WatchMapKey, WatchCounterMapValue, WatchMapKeyHasher>;
|
||||
// Maps the number of the WatchMapKey being used.
|
||||
WatchCounterMap_t watchCounterMap;
|
||||
double backoffDelay = 0.0;
|
||||
};
|
||||
|
||||
// Similar to tr.onError(), but doesn't require a DatabaseContext.
|
||||
|
|
|
@ -2632,7 +2632,8 @@ ACTOR static Future<Void> tenantIdServer(CommitProxyInterface proxy,
|
|||
GetTenantIdRequest req = waitNext(proxy.getTenantId.getFuture());
|
||||
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
|
||||
if (commitData->stats.tenantIdRequestIn.getValue() - commitData->stats.tenantIdRequestOut.getValue() >
|
||||
SERVER_KNOBS->TENANT_ID_REQUEST_MAX_QUEUE_SIZE) {
|
||||
SERVER_KNOBS->TENANT_ID_REQUEST_MAX_QUEUE_SIZE ||
|
||||
(g_network->isSimulated() && !g_simulator->speedUpSimulation && BUGGIFY_WITH_PROB(0.0001))) {
|
||||
++commitData->stats.tenantIdRequestErrors;
|
||||
req.reply.sendError(commit_proxy_memory_limit_exceeded());
|
||||
TraceEvent(SevWarnAlways, "ProxyGetTenantRequestThresholdExceeded").suppressFor(60);
|
||||
|
@ -2728,8 +2729,9 @@ ACTOR static Future<Void> readRequestServer(CommitProxyInterface proxy,
|
|||
GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture());
|
||||
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
|
||||
if (req.limit != CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT && // Always do data distribution requests
|
||||
commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() >
|
||||
SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE) {
|
||||
(commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() >
|
||||
SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE ||
|
||||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.001)))) {
|
||||
++commitData->stats.keyServerLocationErrors;
|
||||
req.reply.sendError(commit_proxy_memory_limit_exceeded());
|
||||
TraceEvent(SevWarnAlways, "ProxyLocationRequestThresholdExceeded").suppressFor(60);
|
||||
|
|
|
@ -1814,18 +1814,21 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
|
|||
CODE_PROBE(true, "Data distributor received a duplicate ongoing snapshot request");
|
||||
TraceEvent("RetryOngoingDistributorSnapRequest").detail("SnapUID", snapUID);
|
||||
ASSERT(snapReq.snapPayload == ddSnapReqMap[snapUID].snapPayload);
|
||||
// Discard the old request if a duplicate new request is received
|
||||
ddSnapReqMap[snapUID].reply.sendError(duplicate_snapshot_request());
|
||||
ddSnapReqMap[snapUID] = snapReq;
|
||||
} else {
|
||||
ddSnapReqMap[snapUID] = snapReq;
|
||||
actors.add(ddSnapCreate(
|
||||
snapReq, db, self->context->ddEnabledState.get(), &ddSnapReqMap, &ddSnapReqResultMap));
|
||||
auto* ddSnapReqResultMapPtr = &ddSnapReqResultMap;
|
||||
actors.add(fmap(
|
||||
[ddSnapReqResultMapPtr, snapUID](Void _) {
|
||||
ddSnapReqResultMapPtr->erase(snapUID);
|
||||
return Void();
|
||||
},
|
||||
delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP)));
|
||||
delayed(
|
||||
ddSnapCreate(
|
||||
snapReq, db, self->context->ddEnabledState.get(), &ddSnapReqMap, &ddSnapReqResultMap),
|
||||
SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP)));
|
||||
}
|
||||
}
|
||||
when(DistributorExclusionSafetyCheckRequest exclCheckReq =
|
||||
|
|
|
@ -2485,13 +2485,15 @@ ACTOR static Future<JsonBuilderObject> blobGranulesStatusFetcher(
|
|||
// Mutation log backup
|
||||
state std::string mlogsUrl = wait(getMutationLogUrl());
|
||||
statusObj["mutation_log_location"] = mlogsUrl;
|
||||
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(mlogsUrl, {}, {});
|
||||
BackupDescription desc = wait(timeoutError(bc->describeBackup(), 2.0));
|
||||
if (desc.contiguousLogEnd.present()) {
|
||||
statusObj["mutation_log_end_version"] = desc.contiguousLogEnd.get();
|
||||
}
|
||||
if (desc.minLogBegin.present()) {
|
||||
statusObj["mutation_log_begin_version"] = desc.minLogBegin.get();
|
||||
if (mlogsUrl != "") {
|
||||
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(mlogsUrl, {}, {});
|
||||
BackupDescription desc = wait(timeoutError(bc->describeBackup(), 2.0));
|
||||
if (desc.contiguousLogEnd.present()) {
|
||||
statusObj["mutation_log_end_version"] = desc.contiguousLogEnd.get();
|
||||
}
|
||||
if (desc.minLogBegin.present()) {
|
||||
statusObj["mutation_log_begin_version"] = desc.minLogBegin.get();
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
|
|
|
@ -7383,6 +7383,24 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
|
|||
}
|
||||
}
|
||||
|
||||
bool fetchKeyCanRetry(const Error& e) {
|
||||
switch (e.code()) {
|
||||
case error_code_end_of_stream:
|
||||
case error_code_connection_failed:
|
||||
case error_code_transaction_too_old:
|
||||
case error_code_future_version:
|
||||
case error_code_process_behind:
|
||||
case error_code_server_overloaded:
|
||||
case error_code_blob_granule_request_failed:
|
||||
case error_code_blob_granule_transaction_too_old:
|
||||
case error_code_grv_proxy_memory_limit_exceeded:
|
||||
case error_code_commit_proxy_memory_limit_exceeded:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
||||
state const UID fetchKeysID = deterministicRandom()->randomUniqueID();
|
||||
state TraceInterval interval("FetchKeys");
|
||||
|
@ -7624,11 +7642,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||
data->fetchKeysBudgetUsed.set(data->fetchKeysBytesBudget <= 0);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_end_of_stream && e.code() != error_code_connection_failed &&
|
||||
e.code() != error_code_transaction_too_old && e.code() != error_code_future_version &&
|
||||
e.code() != error_code_process_behind && e.code() != error_code_server_overloaded &&
|
||||
e.code() != error_code_blob_granule_request_failed &&
|
||||
e.code() != error_code_blob_granule_transaction_too_old) {
|
||||
if (!fetchKeyCanRetry(e)) {
|
||||
throw;
|
||||
}
|
||||
lastError = e;
|
||||
|
|
|
@ -1106,7 +1106,7 @@ ACTOR Future<DistributedTestResults> runWorkload(Database cx,
|
|||
}
|
||||
|
||||
state std::vector<Future<ErrorOr<CheckReply>>> checks;
|
||||
TraceEvent("CheckingResults").log();
|
||||
TraceEvent("TestCheckingResults").detail("WorkloadTitle", spec.title);
|
||||
|
||||
printf("checking test (%s)...\n", printable(spec.title).c_str());
|
||||
|
||||
|
@ -1123,6 +1123,7 @@ ACTOR Future<DistributedTestResults> runWorkload(Database cx,
|
|||
else
|
||||
failure++;
|
||||
}
|
||||
TraceEvent("TestCheckComplete").detail("WorkloadTitle", spec.title);
|
||||
}
|
||||
|
||||
if (spec.phases & TestWorkload::METRICS) {
|
||||
|
|
|
@ -3146,30 +3146,35 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
.detail("Role", snapReq.role);
|
||||
ASSERT(snapReq.role == snapReqMap[snapReqKey].role);
|
||||
ASSERT(snapReq.snapPayload == snapReqMap[snapReqKey].snapPayload);
|
||||
// Discard the old request if a duplicate new request is received
|
||||
// In theory, the old request should be discarded when we send this error since DD won't resend a
|
||||
// request unless for a network error, where the old request is discarded before sending the
|
||||
// duplicate request.
|
||||
snapReqMap[snapReqKey].reply.sendError(duplicate_snapshot_request());
|
||||
snapReqMap[snapReqKey] = snapReq;
|
||||
} else {
|
||||
snapReqMap[snapReqKey] = snapReq; // set map point to the request
|
||||
if (g_network->isSimulated() && (now() - lastSnapTime) < SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP) {
|
||||
// only allow duplicate snapshots on same process in a short time for different roles
|
||||
auto okay = (lastSnapReq.snapUID == snapReq.snapUID) && lastSnapReq.role != snapReq.role;
|
||||
// duplicate snapshots on the same process for the same role is not allowed
|
||||
auto okay = lastSnapReq.snapUID != snapReq.snapUID || lastSnapReq.role != snapReq.role;
|
||||
TraceEvent(okay ? SevInfo : SevError, "RapidSnapRequestsOnSameProcess")
|
||||
.detail("CurrSnapUID", snapReqKey)
|
||||
.detail("CurrSnapUID", snapReq.snapUID)
|
||||
.detail("PrevSnapUID", lastSnapReq.snapUID)
|
||||
.detail("CurrRole", snapReq.role)
|
||||
.detail("PrevRole", lastSnapReq.role)
|
||||
.detail("GapTime", now() - lastSnapTime);
|
||||
}
|
||||
errorForwarders.add(workerSnapCreate(snapReq,
|
||||
snapReq.role.toString() == "coord" ? coordFolder : folder,
|
||||
&snapReqMap,
|
||||
&snapReqResultMap));
|
||||
auto* snapReqResultMapPtr = &snapReqResultMap;
|
||||
errorForwarders.add(fmap(
|
||||
[snapReqResultMapPtr, snapReqKey](Void _) {
|
||||
snapReqResultMapPtr->erase(snapReqKey);
|
||||
return Void();
|
||||
},
|
||||
delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP)));
|
||||
delayed(workerSnapCreate(snapReq,
|
||||
snapReq.role.toString() == "coord" ? coordFolder : folder,
|
||||
&snapReqMap,
|
||||
&snapReqResultMap),
|
||||
SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP)));
|
||||
if (g_network->isSimulated()) {
|
||||
lastSnapReq = snapReq;
|
||||
lastSnapTime = now();
|
||||
|
|
|
@ -518,7 +518,7 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
state Version v2 = wait(setAndCommitKeyValueAndGetVersion(
|
||||
self, cx, self->anotherTenant, self->signedTokenAnotherTenant, key, value));
|
||||
|
||||
{
|
||||
try {
|
||||
GetKeyServerLocationsReply rep =
|
||||
wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False),
|
||||
&CommitProxyInterface::getKeyServersLocations,
|
||||
|
@ -542,8 +542,13 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
.detail("LeakingRangeEnd", range.end.printable());
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
ASSERT(e.code() == error_code_commit_proxy_memory_limit_exceeded);
|
||||
}
|
||||
{
|
||||
try {
|
||||
GetKeyServerLocationsReply rep = wait(basicLoadBalance(
|
||||
cx->getCommitProxies(UseProvisionalProxies::False),
|
||||
&CommitProxyInterface::getKeyServersLocations,
|
||||
|
@ -567,6 +572,11 @@ struct AuthzSecurityWorkload : TestWorkload {
|
|||
.detail("LeakingRangeEnd", range.end.printable());
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
throw e;
|
||||
}
|
||||
ASSERT(e.code() == error_code_commit_proxy_memory_limit_exceeded);
|
||||
}
|
||||
++self->keyLocationLeakNegative;
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/SimpleIni.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/Status.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
|
@ -58,7 +59,8 @@ public: // ctor & dtor
|
|||
maxSnapDelay = getOption(options, "maxSnapDelay"_sr, 25.0);
|
||||
testID = getOption(options, "testID"_sr, 0);
|
||||
restartInfoLocation = getOption(options, "restartInfoLocation"_sr, "simfdb/restartInfo.ini"_sr).toString();
|
||||
retryLimit = getOption(options, "retryLimit"_sr, 5);
|
||||
// default behavior is to retry until success
|
||||
retryLimit = getOption(options, "retryLimit"_sr, -1);
|
||||
g_simulator->allowLogSetKills = false;
|
||||
{
|
||||
double duplicateSnapshotProbability = getOption(options, "duplicateSnapshotProbability"_sr, 0.1);
|
||||
|
@ -94,6 +96,8 @@ public: // workload functions
|
|||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override { TraceEvent("SnapTestWorkloadGetMetrics"); }
|
||||
|
||||
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("all"); }
|
||||
|
||||
ACTOR Future<Void> _create_keys(Database cx, std::string prefix, bool even = true) {
|
||||
state Transaction tr(cx);
|
||||
state std::vector<int64_t> keys;
|
||||
|
@ -157,17 +161,29 @@ public: // workload functions
|
|||
wait(delay(deterministicRandom()->random01()));
|
||||
duplicateSnapStatus = snapCreate(cx, snapCmdRef, self->snapUID);
|
||||
}
|
||||
wait(status);
|
||||
ErrorOr<Void> statusErr = wait(errorOr(status));
|
||||
if (statusErr.isError() && statusErr.getError().code() != error_code_duplicate_snapshot_request) {
|
||||
// First request is expected to fail with duplicate_snapshot_request error
|
||||
// Any other errors should be thrown
|
||||
throw statusErr.getError();
|
||||
}
|
||||
if (self->attemptDuplicateSnapshot) {
|
||||
// If duplicate, the first request is discarded, wait for the latest one
|
||||
wait(duplicateSnapStatus);
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("SnapCreateError").error(e);
|
||||
TraceEvent("SnapTestCreateError")
|
||||
.error(e)
|
||||
.detail("SnapUID", self->snapUID)
|
||||
.detail("Duplicate", self->attemptDuplicateSnapshot);
|
||||
++retry;
|
||||
// snap v2 can fail for many reasons, so retry for 5 times and then fail it
|
||||
// snap v2 can fail for many reasons, so retry until specified times and then fail it
|
||||
if (self->retryLimit != -1 && retry > self->retryLimit) {
|
||||
snapFailed = true;
|
||||
break;
|
||||
}
|
||||
wait(delay(5.0));
|
||||
wait(delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP));
|
||||
}
|
||||
}
|
||||
CSimpleIni ini;
|
||||
|
|
|
@ -104,7 +104,8 @@ ERROR( blob_granule_request_failed, 1079, "BlobGranule request failed" )
|
|||
ERROR( storage_too_many_feed_streams, 1080, "Too many feed streams to a single storage server" )
|
||||
ERROR( storage_engine_not_initialized, 1081, "Storage engine was never successfully initialized." )
|
||||
ERROR( unknown_storage_engine, 1082, "Storage engine type is not recognized." )
|
||||
ERROR( dd_config_changed, 1083, "DataDistribution configuration changed." )
|
||||
ERROR( duplicate_snapshot_request, 1083, "A duplicate snapshot request has been sent, the old request is discarded.")
|
||||
ERROR( dd_config_changed, 1084, "DataDistribution configuration changed." )
|
||||
|
||||
ERROR( broken_promise, 1100, "Broken promise" )
|
||||
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )
|
||||
|
|
Loading…
Reference in New Issue