Merge pull request #1059 from vishesh/task/monitor-leader-on-demand
Monitor leader only when proxies are unknown or any dies
This commit is contained in:
commit
a66e494de6
|
@ -541,6 +541,7 @@ DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), laten
|
|||
ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo, Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed ) {
|
||||
try {
|
||||
state Optional<double> incorrectTime;
|
||||
state Future<Void> leaderMon = ccf ? monitorLeader(ccf, clusterInterface) : Void();
|
||||
loop {
|
||||
OpenDatabaseRequest req;
|
||||
req.knownClientInfoID = outInfo->get().id;
|
||||
|
@ -571,6 +572,36 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
|
|||
when( ClientDBInfo ni = wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().openDatabase.getReply( req ) ) : Never() ) ) {
|
||||
TraceEvent("ClientInfoChange").detail("ChangeID", ni.id);
|
||||
outInfo->set(ni);
|
||||
|
||||
if (ni.proxies.empty()) {
|
||||
TraceEvent("ClientInfo_NoProxiesReturned").detail("ChangeID", ni.id);
|
||||
continue;
|
||||
} else if (!FlowTransport::transport().isClient()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
vector<Future<Void>> onProxyFailureVec;
|
||||
bool skipWaitForProxyFail = false;
|
||||
for (const auto& proxy : ni.proxies) {
|
||||
if (proxy.provisional) {
|
||||
skipWaitForProxyFail = true;
|
||||
break;
|
||||
}
|
||||
|
||||
onProxyFailureVec.push_back(
|
||||
IFailureMonitor::failureMonitor().onDisconnectOrFailure(
|
||||
proxy.getConsistentReadVersion.getEndpoint()) ||
|
||||
IFailureMonitor::failureMonitor().onDisconnectOrFailure(proxy.commit.getEndpoint()) ||
|
||||
IFailureMonitor::failureMonitor().onDisconnectOrFailure(
|
||||
proxy.getKeyServersLocations.getEndpoint()) ||
|
||||
IFailureMonitor::failureMonitor().onDisconnectOrFailure(
|
||||
proxy.getStorageServerRejoinInfo.getEndpoint()));
|
||||
}
|
||||
if (skipWaitForProxyFail) continue;
|
||||
|
||||
leaderMon = Void();
|
||||
wait(waitForAny(onProxyFailureVec));
|
||||
leaderMon = ccf ? monitorLeader(ccf, clusterInterface) : Void();
|
||||
}
|
||||
when( wait( clusterInterface->onChange() ) ) {
|
||||
if(clusterInterface->get().present())
|
||||
|
@ -851,7 +882,6 @@ void Cluster::init( Reference<ClusterConnectionFile> connFile, bool startClientI
|
|||
uncancellable( recurring( &systemMonitor, CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskFlushTrace ) );
|
||||
}
|
||||
|
||||
leaderMon = monitorLeader( connFile, clusterInterface, connectedCoordinatorsNum );
|
||||
failMon = failureMonitorClient( clusterInterface, false );
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,7 +132,6 @@ private:
|
|||
Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface;
|
||||
Reference<ClusterConnectionFile> connectionFile;
|
||||
|
||||
Future<Void> leaderMon;
|
||||
Future<Void> failMon;
|
||||
Future<Void> connected;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue