Fix cases where latency band config could be discarded during recovery or process start.

This commit is contained in:
A.J. Beamon 2019-11-20 11:44:18 -08:00
parent 3c2d849c8d
commit 7c801513e2
2 changed files with 32 additions and 27 deletions

View File

@ -1335,6 +1335,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
dbInfo.clusterInterface = db->serverInfo->get().read().clusterInterface;
dbInfo.distributor = db->serverInfo->get().read().distributor;
dbInfo.ratekeeper = db->serverInfo->get().read().ratekeeper;
dbInfo.latencyBandConfig = db->serverInfo->get().read().latencyBandConfig;
TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id);
db->serverInfo->set( cachedInfo );

View File

@ -258,6 +258,34 @@ struct ProxyCommitData {
return tags;
}
void updateLatencyBandConfig(Optional<LatencyBandConfig> newLatencyBandConfig) {
if(newLatencyBandConfig.present() != latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != latencyBandConfig.get().grvConfig))
{
TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
stats.grvLatencyBands.clearBands();
if(newLatencyBandConfig.present()) {
for(auto band : newLatencyBandConfig.get().grvConfig.bands) {
stats.grvLatencyBands.addThreshold(band);
}
}
}
if(newLatencyBandConfig.present() != latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().commitConfig != latencyBandConfig.get().commitConfig))
{
TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
stats.commitLatencyBands.clearBands();
if(newLatencyBandConfig.present()) {
for(auto band : newLatencyBandConfig.get().commitConfig.bands) {
stats.commitLatencyBands.addThreshold(band);
}
}
}
latencyBandConfig = newLatencyBandConfig;
}
ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion, Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit, Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
logAdapter(NULL), txnStateStore(NULL), popRemoteTxs(false),
@ -1603,6 +1631,8 @@ ACTOR Future<Void> masterProxyServerCore(
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
createWhitelistBinPathVec(whitelistBinPaths, commitData.whitelistedBinPathVec);
commitData.updateLatencyBandConfig(commitData.db->get().latencyBandConfig);
// ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is only a approximate formula for limiting the memory used.
// COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR is an estimate based on experiments and not an accurate one.
state int64_t commitBatchesMemoryLimit = std::min(SERVER_KNOBS->COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, static_cast<int64_t>((SERVER_KNOBS->SERVER_MEM_LIMIT * SERVER_KNOBS->COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / SERVER_KNOBS->COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR));
@ -1638,33 +1668,7 @@ ACTOR Future<Void> masterProxyServerCore(
commitData.logSystem->popTxs(commitData.lastTxsPop, tagLocalityRemoteLog);
}
Optional<LatencyBandConfig> newLatencyBandConfig = commitData.db->get().latencyBandConfig;
if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != commitData.latencyBandConfig.get().grvConfig))
{
TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
commitData.stats.grvLatencyBands.clearBands();
if(newLatencyBandConfig.present()) {
for(auto band : newLatencyBandConfig.get().grvConfig.bands) {
commitData.stats.grvLatencyBands.addThreshold(band);
}
}
}
if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().commitConfig != commitData.latencyBandConfig.get().commitConfig))
{
TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
commitData.stats.commitLatencyBands.clearBands();
if(newLatencyBandConfig.present()) {
for(auto band : newLatencyBandConfig.get().commitConfig.bands) {
commitData.stats.commitLatencyBands.addThreshold(band);
}
}
}
commitData.latencyBandConfig = newLatencyBandConfig;
commitData.updateLatencyBandConfig(commitData.db->get().latencyBandConfig);
}
when(wait(onError)) {}
when(std::pair<vector<CommitTransactionRequest>, int> batchedRequests = waitNext(batchedCommits.getFuture())) {