onMasterProxiesChanged was being triggered when any member of ClientDBInfo changed. Change the behavior to be triggered only when proxies field in ClientDBInfo is changed.
This commit is contained in:
parent
eeaea60f94
commit
3efaaec479
|
@ -101,6 +101,8 @@ public:
|
|||
|
||||
// Key DB-specific information
|
||||
Reference<AsyncVar<ClientDBInfo>> clientInfo;
|
||||
AsyncTrigger masterProxiesChangeTrigger;
|
||||
Future<Void> monitorMasterProxiesInfoChange;
|
||||
Reference<ProxyInfo> masterProxies;
|
||||
UID masterProxiesLastChange;
|
||||
LocalityData clientLocality;
|
||||
|
|
|
@ -442,12 +442,25 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext *cx) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorMasterProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo, AsyncTrigger *triggerVar) {
|
||||
state vector< MasterProxyInterface > curProxies;
|
||||
curProxies = clientDBInfo->get().proxies;
|
||||
|
||||
loop{
|
||||
Void _ = wait(clientDBInfo->onChange());
|
||||
if (clientDBInfo->get().proxies != curProxies) {
|
||||
curProxies = clientDBInfo->get().proxies;
|
||||
triggerVar->trigger();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DatabaseContext::DatabaseContext(
|
||||
Reference<AsyncVar<ClientDBInfo>> clientInfo,
|
||||
Reference<Cluster> cluster, Future<Void> clientInfoMonitor,
|
||||
Standalone<StringRef> dbName, Standalone<StringRef> dbId,
|
||||
int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware )
|
||||
: clientInfo(clientInfo), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbName(dbName), dbId(dbId),
|
||||
: clientInfo(clientInfo), masterProxiesChangeTrigger(), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbName(dbName), dbId(dbId),
|
||||
transactionsReadVersions(0), transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsPastVersions(0),
|
||||
transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), taskID(taskID),
|
||||
outstandingWatches(0), maxOutstandingWatches(CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
|
||||
|
@ -460,6 +473,7 @@ DatabaseContext::DatabaseContext(
|
|||
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
|
||||
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
|
||||
|
||||
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
|
||||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
}
|
||||
|
||||
|
@ -542,6 +556,7 @@ Database DatabaseContext::create( Reference<AsyncVar<ClientDBInfo>> info, Future
|
|||
}
|
||||
|
||||
DatabaseContext::~DatabaseContext() {
|
||||
monitorMasterProxiesInfoChange.cancel();
|
||||
locationCacheLock.assertNotEntered();
|
||||
SSInterfaceCacheLock.assertNotEntered();
|
||||
SSInterfaceCache.clear();
|
||||
|
@ -643,7 +658,7 @@ void DatabaseContext::invalidateCache( std::vector<UID> const& ids ) {
|
|||
}
|
||||
|
||||
Future<Void> DatabaseContext::onMasterProxiesChanged() {
|
||||
return this->clientInfo->onChange();
|
||||
return this->masterProxiesChangeTrigger.onTrigger();
|
||||
}
|
||||
|
||||
int64_t extractIntOption( Optional<StringRef> value, int64_t minValue, int64_t maxValue ) {
|
||||
|
|
|
@ -1624,11 +1624,15 @@ ACTOR Future<Void> monitorClientTxnInfoConfigs(ClusterControllerData::DBInfo* db
|
|||
state Optional<Value> rateVal = wait(tr.get(fdbClientInfoTxnSampleRate));
|
||||
state Optional<Value> limitVal = wait(tr.get(fdbClientInfoTxnSizeLimit));
|
||||
ClientDBInfo clientInfo = db->clientInfo->get();
|
||||
clientInfo.clientTxnInfoSampleRate = rateVal.present() ? BinaryReader::fromStringRef<double>(rateVal.get(), Unversioned()) : std::numeric_limits<double>::infinity();
|
||||
clientInfo.clientTxnInfoSizeLimit = limitVal.present() ? BinaryReader::fromStringRef<int64_t>(limitVal.get(), Unversioned()) : -1;
|
||||
clientInfo.id = g_random->randomUniqueID();
|
||||
db->clientInfo->set(clientInfo);
|
||||
|
||||
double sampleRate = rateVal.present() ? BinaryReader::fromStringRef<double>(rateVal.get(), Unversioned()) : std::numeric_limits<double>::infinity();
|
||||
int64_t sizeLimit = limitVal.present() ? BinaryReader::fromStringRef<int64_t>(limitVal.get(), Unversioned()) : -1;
|
||||
if (sampleRate != clientInfo.clientTxnInfoSampleRate || sizeLimit != clientInfo.clientTxnInfoSampleRate) {
|
||||
clientInfo.id = g_random->randomUniqueID();
|
||||
clientInfo.clientTxnInfoSampleRate = sampleRate;
|
||||
clientInfo.clientTxnInfoSizeLimit = sizeLimit;
|
||||
db->clientInfo->set(clientInfo);
|
||||
}
|
||||
|
||||
state Future<Void> watchRateFuture = tr.watch(fdbClientInfoTxnSampleRate);
|
||||
state Future<Void> watchLimitFuture = tr.watch(fdbClientInfoTxnSizeLimit);
|
||||
Void _ = wait(tr.commit());
|
||||
|
|
Loading…
Reference in New Issue