Merge pull request #5062 from halfprice/zhewu/cc-health-monitor-interface

Add updateWorkerHealth interface in ClusterController, and make ClusterController to track reported worker health
This commit is contained in:
Zhe Wu 2021-06-25 15:03:47 -07:00 committed by GitHub
commit dbae4f329e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 177 additions and 2 deletions

View File

@ -462,6 +462,7 @@ void ServerKnobs::initialize(Randomize _randomize, ClientKnobs* clientKnobs, IsS
init( REPLACE_INTERFACE_CHECK_DELAY, 5.0 );
init( COORDINATOR_REGISTER_INTERVAL, 5.0 );
init( CLIENT_REGISTER_INTERVAL, 600.0 );
init( CLUSTER_CONTROLLER_ENABLE_WORKER_HEALTH_MONITOR, false );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );

View File

@ -389,6 +389,7 @@ public:
double REPLACE_INTERFACE_CHECK_DELAY;
double COORDINATOR_REGISTER_INTERVAL;
double CLIENT_REGISTER_INTERVAL;
bool CLUSTER_CONTROLLER_ENABLE_WORKER_HEALTH_MONITOR;
// Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)

View File

@ -2719,6 +2719,66 @@ public:
return idUsed;
}
// Updates work health signals in `workerHealth` based on `req`.
void updateWorkerHealth(const UpdateWorkerHealthRequest& req) {
std::string degradedPeersString;
for (int i = 0; i < req.degradedPeers.size(); ++i) {
degradedPeersString += i == 0 ? "" : " " + req.degradedPeers[i].toString();
}
TraceEvent("ClusterControllerUpdateWorkerHealth")
.detail("WorkerAddress", req.address)
.detail("DegradedPeers", degradedPeersString);
// `req.degradedPeers` contains the latest peer performance view from the worker. Clear the worker if the
// requested worker doesn't see any degraded peers.
if (req.degradedPeers.empty()) {
workerHealth.erase(req.address);
return;
}
double currentTime = now();
// Current `workerHealth` doesn't have any information about the incoming worker. Add the worker into
// `workerHealth`.
if (workerHealth.find(req.address) == workerHealth.end()) {
workerHealth[req.address] = {};
for (const auto& degradedPeer : req.degradedPeers) {
workerHealth[req.address].degradedPeers[degradedPeer] = { currentTime, currentTime };
}
return;
}
// The incoming worker already exists in `workerHealth`.
auto& health = workerHealth[req.address];
// First, remove any degraded peers recorded in the `workerHealth`, but aren't in the incoming request. These
// machines network performance should have recovered.
std::unordered_set<NetworkAddress> recoveredPeers;
for (const auto& [peer, times] : health.degradedPeers) {
recoveredPeers.insert(peer);
}
for (const auto& peer : req.degradedPeers) {
if (recoveredPeers.find(peer) != recoveredPeers.end()) {
recoveredPeers.erase(peer);
}
}
for (const auto& peer : recoveredPeers) {
health.degradedPeers.erase(peer);
}
// Update the worker's degradedPeers.
for (const auto& peer : req.degradedPeers) {
auto it = health.degradedPeers.find(peer);
if (it == health.degradedPeers.end()) {
health.degradedPeers[peer] = { currentTime, currentTime };
continue;
}
it->second.lastRefreshTime = currentTime;
}
}
std::map<Optional<Standalone<StringRef>>, WorkerInfo> id_worker;
std::map<Optional<Standalone<StringRef>>, ProcessClass>
id_class; // contains the mapping from process id to process class from the database
@ -2757,6 +2817,18 @@ public:
Optional<UID> recruitingRatekeeperID;
AsyncVar<bool> recruitRatekeeper;
// Stores the health information from a particular worker's perspective.
struct WorkerHealth {
struct DegradedTimes {
double startTime = 0;
double lastRefreshTime = 0;
};
std::unordered_map<NetworkAddress, DegradedTimes> degradedPeers;
// TODO(zhewu): Include disk and CPU signals.
};
std::unordered_map<NetworkAddress, WorkerHealth> workerHealth;
CounterCollection clusterControllerMetrics;
Counter openDatabaseRequests;
@ -4537,6 +4609,11 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
++self.registerMasterRequests;
clusterRegisterMaster(&self, req);
}
when(UpdateWorkerHealthRequest req = waitNext(interf.updateWorkerHealth.getFuture())) {
if (SERVER_KNOBS->CLUSTER_CONTROLLER_ENABLE_WORKER_HEALTH_MONITOR) {
self.updateWorkerHealth(req);
}
}
when(GetServerDBInfoRequest req = waitNext(interf.getServerDBInfo.getFuture())) {
self.addActor.send(clusterGetServerInfo(&self.db, req.knownServerInfoID, req.reply));
}
@ -4631,3 +4708,67 @@ ACTOR Future<Void> clusterController(Reference<ClusterConnectionFile> connFile,
hasConnected = true;
}
}
namespace {
// Tests `ClusterControllerData::updateWorkerHealth()` can update `ClusterControllerData::workerHealth` based on
// `UpdateWorkerHealth` request correctly.
TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
state ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
state NetworkAddress workerAddress(IPAddress(0x01010101), 1);
state NetworkAddress badPeer1(IPAddress(0x02020202), 1);
state NetworkAddress badPeer2(IPAddress(0x03030303), 1);
state NetworkAddress badPeer3(IPAddress(0x04040404), 1);
// Create a `UpdateWorkerHealthRequest` with two bad peers, and they should appear in the `workerAddress`'s
// degradedPeers.
{
UpdateWorkerHealthRequest req;
req.address = workerAddress;
req.degradedPeers.push_back(badPeer1);
req.degradedPeers.push_back(badPeer2);
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
ASSERT_EQ(health.degradedPeers.size(), 2);
ASSERT(health.degradedPeers.find(badPeer1) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer1].startTime, health.degradedPeers[badPeer1].lastRefreshTime);
ASSERT(health.degradedPeers.find(badPeer2) != health.degradedPeers.end());
}
// Create a `UpdateWorkerHealthRequest` with two bad peers, one from the previous test and a new one.
// The one from the previous test should have lastRefreshTime updated.
// The other one from the previous test not included in this test should be removed.
{
// Make the time to move so that now() guarantees to return a larger value than before.
wait(delay(0.001));
UpdateWorkerHealthRequest req;
req.address = workerAddress;
req.degradedPeers.push_back(badPeer1);
req.degradedPeers.push_back(badPeer3);
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
ASSERT_EQ(health.degradedPeers.size(), 2);
ASSERT(health.degradedPeers.find(badPeer1) != health.degradedPeers.end());
ASSERT_LT(health.degradedPeers[badPeer1].startTime, health.degradedPeers[badPeer1].lastRefreshTime);
ASSERT(health.degradedPeers.find(badPeer2) == health.degradedPeers.end());
ASSERT(health.degradedPeers.find(badPeer3) != health.degradedPeers.end());
}
// Create a `UpdateWorkerHealthRequest` with empty `degradedPeers`, which should remove the worker from
// `workerHealth`.
{
UpdateWorkerHealthRequest req;
req.address = workerAddress;
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) == data.workerHealth.end());
}
return Void();
}
} // namespace

View File

@ -149,6 +149,7 @@ struct ClusterControllerFullInterface {
RequestStream<struct RegisterWorkerRequest> registerWorker;
RequestStream<struct GetWorkersRequest> getWorkers;
RequestStream<struct RegisterMasterRequest> registerMaster;
RequestStream<struct UpdateWorkerHealthRequest> updateWorkerHealth;
RequestStream<struct GetServerDBInfoRequest>
getServerDBInfo; // only used by testers; the cluster controller will send the serverDBInfo to workers
@ -160,7 +161,8 @@ struct ClusterControllerFullInterface {
return clientInterface.hasMessage() || recruitFromConfiguration.getFuture().isReady() ||
recruitRemoteFromConfiguration.getFuture().isReady() || recruitStorage.getFuture().isReady() ||
registerWorker.getFuture().isReady() || getWorkers.getFuture().isReady() ||
registerMaster.getFuture().isReady() || getServerDBInfo.getFuture().isReady();
registerMaster.getFuture().isReady() || updateWorkerHealth.getFuture().isReady() ||
getServerDBInfo.getFuture().isReady();
}
void initEndpoints() {
@ -171,6 +173,7 @@ struct ClusterControllerFullInterface {
registerWorker.getEndpoint(TaskPriority::ClusterControllerWorker);
getWorkers.getEndpoint(TaskPriority::ClusterController);
registerMaster.getEndpoint(TaskPriority::ClusterControllerRegister);
updateWorkerHealth.getEndpoint(TaskPriority::ClusterController);
getServerDBInfo.getEndpoint(TaskPriority::ClusterController);
}
@ -187,6 +190,7 @@ struct ClusterControllerFullInterface {
registerWorker,
getWorkers,
registerMaster,
updateWorkerHealth,
getServerDBInfo);
}
};
@ -418,6 +422,20 @@ struct GetWorkersRequest {
}
};
struct UpdateWorkerHealthRequest {
constexpr static FileIdentifier file_identifier = 5789927;
NetworkAddress address;
std::vector<NetworkAddress> degradedPeers;
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, address, degradedPeers);
}
};
struct InitializeTLogRequest {
constexpr static FileIdentifier file_identifier = 15604392;
UID recruitmentID;

View File

@ -687,6 +687,7 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
addressesInDbAndPrimaryDc(interf.addresses(), dbInfo) && ccInterface->get().present()) {
nextHealthCheckDelay = delay(SERVER_KNOBS->WORKER_HEALTH_MONITOR_INTERVAL);
const auto& allPeers = FlowTransport::transport().getAllPeers();
UpdateWorkerHealthRequest req;
for (const auto& [address, peer] : allPeers) {
if (peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
// Ignore peers that don't have enough samples.
@ -724,9 +725,14 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
.detail("Count", peer->pingLatencies.getPopulationSize())
.detail("TimeoutCount", peer->timeoutCount);
// TODO(zhewu): Keep track of degraded peers and send them to cluster controller.
req.degradedPeers.push_back(address);
}
}
if (!req.degradedPeers.empty()) {
req.address = FlowTransport::transport().getLocalAddress();
ccInterface->get().get().updateWorkerHealth.send(req);
}
}
choose {
when(wait(nextHealthCheckDelay)) {}

View File

@ -88,6 +88,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES SpecificUnitTest.txt IGNORE)
add_fdb_test(TEST_FILES StorageMetricsSampleTests.txt IGNORE)
add_fdb_test(TEST_FILES WorkerTests.txt IGNORE)
add_fdb_test(TEST_FILES ClusterControllerTests.txt IGNORE)
add_fdb_test(TEST_FILES StorageServerInterface.txt)
add_fdb_test(TEST_FILES StreamingWrite.txt IGNORE)
add_fdb_test(TEST_FILES SystemData.txt)

View File

@ -0,0 +1,7 @@
testTitle=UnitTests
startDelay=0
useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/fdbserver/clustercontroller/