Client lazily fetches health metrics from proxies
This commit is contained in:
parent
9ee72261c4
commit
56ae46f89e
|
@ -65,6 +65,7 @@ public:
|
|||
Reference<ProxyInfo> getMasterProxies();
|
||||
Future<Reference<ProxyInfo>> getMasterProxiesFuture();
|
||||
Future<Void> onMasterProxiesChanged();
|
||||
Future<HealthMetrics> getHealthMetrics(bool detailed);
|
||||
|
||||
// Update the watch counter for the database
|
||||
void addWatch();
|
||||
|
@ -161,7 +162,8 @@ public:
|
|||
int apiVersion;
|
||||
|
||||
HealthMetrics healthMetrics;
|
||||
Future<Void> updateHealthMetrics;
|
||||
double healthMetricsLastUpdated;
|
||||
double detailedHealthMetricsLastUpdated;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -70,8 +70,8 @@ ClientKnobs::ClientKnobs(bool randomize) {
|
|||
init( STORAGE_METRICS_SHARD_LIMIT, 100 ); if( randomize && BUGGIFY ) STORAGE_METRICS_SHARD_LIMIT = 3;
|
||||
init( STORAGE_METRICS_UNFAIR_SPLIT_LIMIT, 2.0/3.0 );
|
||||
init( STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, 15.0 );
|
||||
init( UPDATE_HEALTH_METRICS_INTERVAL, 0.5 );
|
||||
init( UPDATE_DETAILED_HEALTH_METRICS_INTERVAL, 5.0 );
|
||||
init( AGGREGATE_HEALTH_METRICS_MAX_STALENESS, 0.5 );
|
||||
init( DETAILED_HEALTH_METRICS_MAX_STALENESS, 5.0 );
|
||||
|
||||
//KeyRangeMap
|
||||
init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10;
|
||||
|
|
|
@ -69,8 +69,8 @@ public:
|
|||
int STORAGE_METRICS_SHARD_LIMIT;
|
||||
double STORAGE_METRICS_UNFAIR_SPLIT_LIMIT;
|
||||
double STORAGE_METRICS_TOO_MANY_SHARDS_DELAY;
|
||||
double UPDATE_HEALTH_METRICS_INTERVAL;
|
||||
double UPDATE_DETAILED_HEALTH_METRICS_INTERVAL;
|
||||
double AGGREGATE_HEALTH_METRICS_MAX_STALENESS;
|
||||
double DETAILED_HEALTH_METRICS_MAX_STALENESS;
|
||||
|
||||
//KeyRangeMap
|
||||
int KRM_GET_RANGE_LIMIT;
|
||||
|
|
|
@ -466,32 +466,47 @@ ACTOR static Future<Void> monitorMasterProxiesChange(Reference<AsyncVar<ClientDB
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> updateHealthMetricsActor(DatabaseContext *cx) {
|
||||
state bool sendDetailedHealthMetrics = networkOptions.sendDetailedHealthMetrics;
|
||||
state double lastDetailed = 0;
|
||||
ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext *cx, bool detailed) {
|
||||
if (now() - cx->healthMetricsLastUpdated < CLIENT_KNOBS->AGGREGATE_HEALTH_METRICS_MAX_STALENESS) {
|
||||
if (detailed) {
|
||||
TraceEvent("SENDING_CACHED_DETAILED_METRICS");
|
||||
return cx->healthMetrics;
|
||||
}
|
||||
else {
|
||||
HealthMetrics result;
|
||||
result.update(cx->healthMetrics, false, false);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
state bool sendDetailedRequest = detailed && now() - cx->detailedHealthMetricsLastUpdated >
|
||||
CLIENT_KNOBS->DETAILED_HEALTH_METRICS_MAX_STALENESS;
|
||||
loop {
|
||||
wait( delay(CLIENT_KNOBS->UPDATE_HEALTH_METRICS_INTERVAL) );
|
||||
state bool sendDetailed = networkOptions.sendDetailedHealthMetrics && now() - lastDetailed > CLIENT_KNOBS->UPDATE_DETAILED_HEALTH_METRICS_INTERVAL;
|
||||
loop {
|
||||
choose {
|
||||
when(wait(cx->onMasterProxiesChanged())) {}
|
||||
when(state GetHealthMetricsReply rep =
|
||||
wait(cx->getMasterProxies().isValid() && cx->getMasterProxies()->size() ?
|
||||
loadBalance(cx->getMasterProxies(),
|
||||
&MasterProxyInterface::getHealthMetrics,
|
||||
GetHealthMetricsRequest(sendDetailed)) :
|
||||
Never())) {
|
||||
cx->healthMetrics.update(rep.healthMetrics, sendDetailed, true);
|
||||
break;
|
||||
choose {
|
||||
when(wait(cx->onMasterProxiesChanged())) {}
|
||||
when(GetHealthMetricsReply rep =
|
||||
wait(loadBalance(cx->getMasterProxies(), &MasterProxyInterface::getHealthMetrics,
|
||||
GetHealthMetricsRequest(sendDetailedRequest)))) {
|
||||
cx->healthMetrics.update(rep.healthMetrics, detailed, true);
|
||||
if (detailed) {
|
||||
cx->healthMetricsLastUpdated = now();
|
||||
cx->detailedHealthMetricsLastUpdated = now();
|
||||
return cx->healthMetrics;
|
||||
}
|
||||
else {
|
||||
cx->healthMetricsLastUpdated = now();
|
||||
HealthMetrics result;
|
||||
result.update(cx->healthMetrics, false, false);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(sendDetailed) {
|
||||
lastDetailed = now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
|
||||
return getHealthMetricsActor(this, detailed);
|
||||
}
|
||||
|
||||
DatabaseContext::DatabaseContext(
|
||||
Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId,
|
||||
int taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, int apiVersion )
|
||||
|
@ -500,7 +515,8 @@ DatabaseContext::DatabaseContext(
|
|||
transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0),
|
||||
transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0),
|
||||
transactionsMaybeCommitted(0), transactionsResourceConstrained(0), outstandingWatches(0),
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000)
|
||||
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
|
||||
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0)
|
||||
{
|
||||
maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES;
|
||||
|
||||
|
@ -514,8 +530,6 @@ DatabaseContext::DatabaseContext(
|
|||
|
||||
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
|
||||
updateHealthMetrics = updateHealthMetricsActor(this);
|
||||
}
|
||||
|
||||
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) {}
|
||||
|
@ -938,17 +952,6 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
|
|||
validateOptionValue(value, false);
|
||||
networkOptions.slowTaskProfilingEnabled = true;
|
||||
break;
|
||||
case FDBNetworkOptions::SEND_DETAILED_HEALTH_METRICS:
|
||||
validateOptionValue(value, true);
|
||||
int sendDetailedHealthMetrics;
|
||||
try {
|
||||
sendDetailedHealthMetrics = std::stoi(value.get().toString());
|
||||
} catch (...) {
|
||||
TraceEvent(SevWarnAlways, "InvalidDetailedMetricsOptionValue").detail("Value", value.get().toString());
|
||||
throw invalid_option_value();
|
||||
}
|
||||
networkOptions.sendDetailedHealthMetrics = (sendDetailedHealthMetrics > 0);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -56,13 +56,12 @@ struct NetworkOptions {
|
|||
Optional<bool> logClientInfo;
|
||||
Standalone<VectorRef<ClientVersionRef>> supportedVersions;
|
||||
bool slowTaskProfilingEnabled;
|
||||
bool sendDetailedHealthMetrics;
|
||||
|
||||
// The default values, TRACE_DEFAULT_ROLL_SIZE and TRACE_DEFAULT_MAX_LOGS_SIZE are located in Trace.h.
|
||||
NetworkOptions()
|
||||
: localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()),
|
||||
traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"),
|
||||
traceFormat("xml"), slowTaskProfilingEnabled(false), sendDetailedHealthMetrics(false) {}
|
||||
traceFormat("xml"), slowTaskProfilingEnabled(false) {}
|
||||
};
|
||||
|
||||
class Database {
|
||||
|
|
|
@ -107,9 +107,6 @@ description is not currently required but encouraged.
|
|||
description="Disables logging of client statistics, such as sampled transaction activity." />
|
||||
<Option name="enable_slow_task_profiling" code="71"
|
||||
description="Enables debugging feature to perform slow task profiling. Requires trace logging to be enabled. WARNING: this feature is not recommended for use in production." />
|
||||
<Option name="send_detailed_health_metrics" code="72"
|
||||
paramType="Int" paramDescription="Positive value enables sending of detailed health metrics"
|
||||
description="Send per-process map of health metrics to client"/>
|
||||
<Option name="supported_client_versions" code="1000"
|
||||
paramType="String" paramDescription="[release version],[source version],[protocol version];..."
|
||||
description="This option is set automatically to communicate the list of supported clients to the active client."
|
||||
|
|
|
@ -106,14 +106,15 @@ struct ThrottlingWorkload : KVWorkload {
|
|||
|
||||
loop {
|
||||
wait(delay(self->healthMetricsCheckInterval));
|
||||
if (healthMetrics == cx->healthMetrics)
|
||||
HealthMetrics newHealthMetrics = wait(cx->getHealthMetrics(self->sendDetailedHealthMetrics));
|
||||
if (healthMetrics == newHealthMetrics)
|
||||
{
|
||||
if (++repeated > self->maxAllowedStaleness / self->healthMetricsCheckInterval)
|
||||
self->healthMetricsStoppedUpdating = true;
|
||||
}
|
||||
else
|
||||
repeated = 0;
|
||||
healthMetrics = cx->healthMetrics;
|
||||
healthMetrics = newHealthMetrics;
|
||||
|
||||
self->tokenBucket.transactionRate = healthMetrics.tpsLimit * self->throttlingMultiplier / self->clientCount;
|
||||
self->worstStorageQueue = std::max(self->worstStorageQueue, healthMetrics.worstStorageQueue);
|
||||
|
@ -176,11 +177,9 @@ struct ThrottlingWorkload : KVWorkload {
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> _setup(Database cx, ThrottlingWorkload* self) {
|
||||
Standalone<StringRef> value(format("%d", self->sendDetailedHealthMetrics ? 1 : 0));
|
||||
setNetworkOption(FDBNetworkOptions::SEND_DETAILED_HEALTH_METRICS, Optional<StringRef>(value));
|
||||
if (!self->sendDetailedHealthMetrics) {
|
||||
// Clear detailed health metrics that are already populated
|
||||
wait(delay(2 * CLIENT_KNOBS->UPDATE_DETAILED_HEALTH_METRICS_INTERVAL));
|
||||
wait(delay(2 * CLIENT_KNOBS->DETAILED_HEALTH_METRICS_MAX_STALENESS));
|
||||
cx->healthMetrics.storageStats.clear();
|
||||
cx->healthMetrics.tLogQueue.clear();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue