Addressing comments.
This commit is contained in:
parent
4e3e2b0392
commit
07ee4029f2
|
@ -1438,8 +1438,8 @@ NetworkAddress FlowTransport::getLocalAddress() const {
|
|||
return self->localAddresses.address;
|
||||
}
|
||||
|
||||
std::unordered_map<NetworkAddress, Reference<struct Peer>>* FlowTransport::getAllPeers() {
|
||||
return &self->peers;
|
||||
const std::unordered_map<NetworkAddress, Reference<Peer>>& FlowTransport::getAllPeers() const {
|
||||
return self->peers;
|
||||
}
|
||||
|
||||
std::map<NetworkAddress, std::pair<uint64_t, double>>* FlowTransport::getIncompatiblePeers() {
|
||||
|
|
|
@ -198,7 +198,7 @@ public:
|
|||
NetworkAddressList getLocalAddresses() const;
|
||||
|
||||
// Returns all peers that the FlowTransport is monitoring.
|
||||
std::unordered_map<NetworkAddress, Reference<struct Peer>>* getAllPeers();
|
||||
const std::unordered_map<NetworkAddress, Reference<Peer>>& getAllPeers() const;
|
||||
|
||||
// Returns the same of all peers that have attempted to connect, but have incompatible protocol versions
|
||||
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
|
||||
|
|
|
@ -555,10 +555,11 @@ public:
|
|||
double MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS;
|
||||
double DBINFO_FAILED_DELAY;
|
||||
bool ENABLE_WORKER_HEALTH_MONITOR;
|
||||
double WORKER_HEALTH_MONITOR_INTERVAL;
|
||||
double PEER_LATENCY_DEGRADATION_PERCENTILE;
|
||||
double PEER_LATENCY_DEGRADATION_THRESHOLD;
|
||||
double PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD;
|
||||
double WORKER_HEALTH_MONITOR_INTERVAL; // Interval between two health monitor health check.
|
||||
int PEER_LATENCY_CHECK_MIN_POPULATION; // The minimum number of latency samples required to check a peer.
|
||||
double PEER_LATENCY_DEGRADATION_PERCENTILE; // The percentile latency used to check peer health.
|
||||
double PEER_LATENCY_DEGRADATION_THRESHOLD; // The latency threshold to consider a peer degraded.
|
||||
double PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD; // The percentage of timeout to consider a peer degraded.
|
||||
|
||||
// Test harness
|
||||
double WORKER_POLL_DELAY;
|
||||
|
|
|
@ -596,7 +596,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
|
|||
bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
|
||||
const auto& dbi = dbInfo->get();
|
||||
|
||||
if (dbi.master.addresses().contain(address)) {
|
||||
if (dbi.master.addresses().contains(address)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -624,7 +624,7 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference<AsyncVar<S
|
|||
continue;
|
||||
}
|
||||
|
||||
if (tlog.interf().addresses().contain(address)) {
|
||||
if (tlog.interf().addresses().contains(address)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -633,7 +633,7 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference<AsyncVar<S
|
|||
return false;
|
||||
}
|
||||
|
||||
bool addressInDbAndPrimaryDc(const NetworkAddressList& addresses, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
|
||||
bool addressesInDbAndPrimaryDc(const NetworkAddressList& addresses, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
|
||||
return addressInDbAndPrimaryDc(addresses.address, dbInfo) ||
|
||||
(addresses.secondaryAddress.present() && addressInDbAndPrimaryDc(addresses.secondaryAddress.get(), dbInfo));
|
||||
}
|
||||
|
@ -685,12 +685,12 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
|
|||
loop {
|
||||
Future<Void> nextHealthCheckDelay = Never();
|
||||
if (dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
|
||||
addressInDbAndPrimaryDc(interf.addresses(), dbInfo) && ccInterface->get().present()) {
|
||||
addressesInDbAndPrimaryDc(interf.addresses(), dbInfo) && ccInterface->get().present()) {
|
||||
nextHealthCheckDelay = delay(SERVER_KNOBS->WORKER_HEALTH_MONITOR_INTERVAL);
|
||||
auto* allPeers = FlowTransport::transport().getAllPeers();
|
||||
for (auto& [address, peer] : *allPeers) {
|
||||
if (peer->pingLatencies.getPopulationSize() < 50) {
|
||||
// Ignore peers that do not have enough samples.
|
||||
const auto& allPeers = FlowTransport::transport().getAllPeers();
|
||||
for (const auto& [address, peer] : allPeers) {
|
||||
if (peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
|
||||
// Ignore peers that don't have enough samples.
|
||||
// TODO(zhewu): Currently, FlowTransport latency monitor clears ping latency samples on a regular
|
||||
// basis, which may affect the measurement count. Currently,
|
||||
// WORKER_HEALTH_MONITOR_INTERVAL is much smaller than the ping clearance interval, so
|
||||
|
@ -712,6 +712,7 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
|
|||
SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) {
|
||||
// This is a degraded peer.
|
||||
TraceEvent("HealthMonitorDetectDegradedPeer")
|
||||
.suppressFor(30)
|
||||
.detail("Peer", address)
|
||||
.detail("Elapsed", now() - peer->lastLoggedTime)
|
||||
.detail("MinLatency", peer->pingLatencies.min())
|
||||
|
|
|
@ -306,7 +306,7 @@ struct NetworkAddressList {
|
|||
return address.toString() + ", " + secondaryAddress.get().toString();
|
||||
}
|
||||
|
||||
bool contain(const NetworkAddress& r) const {
|
||||
bool contains(const NetworkAddress& r) const {
|
||||
return address == r || (secondaryAddress.present() && secondaryAddress.get() == r);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue