- Add a special key in order to fetch a specific group of status json fields.

This commit is contained in:
Sreenath Bodagala 2023-09-25 16:23:19 +00:00
parent aa1d005cc5
commit 3c01b1befe
10 changed files with 252 additions and 60 deletions

View File

@ -1373,7 +1373,7 @@ void DatabaseContext::registerSpecialKeysImpl(SpecialKeySpace::MODULE module,
}
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> clusterRecord);
ACTOR Future<Optional<Value>> getJSON(Database db);
ACTOR Future<Optional<Value>> getJSON(Database db, std::string jsonField = "");
struct SingleSpecialKeyImpl : SpecialKeyRangeReadImpl {
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
@ -1600,6 +1600,15 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
smoothMidShardSize.reset(CLIENT_KNOBS->INIT_MID_SHARD_BYTES);
globalConfig = std::make_unique<GlobalConfig>(this);
if (apiVersion.version() >= 740) {
registerSpecialKeysImpl(
SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<FaultToleranceMetricsImpl>(
singleKeyRange("fault_tolerance_metrics_json"_sr)
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METRICS).begin)));
}
if (apiVersion.version() >= 700) {
registerSpecialKeysImpl(SpecialKeySpace::MODULE::ERRORMSG,
SpecialKeySpace::IMPLTYPE::READONLY,

View File

@ -1598,8 +1598,8 @@ Optional<Value> getValueFromJSON(StatusObject statusObj) {
}
}
ACTOR Future<Optional<Value>> getJSON(Database db) {
StatusObject statusObj = wait(StatusClient::statusFetcher(db));
ACTOR Future<Optional<Value>> getJSON(Database db, std::string jsonField = "") {
StatusObject statusObj = wait(StatusClient::statusFetcher(db, jsonField));
return getValueFromJSON(statusObj);
}

View File

@ -2903,6 +2903,29 @@ Future<RangeResult> WorkerInterfacesSpecialKeyImpl::getRange(ReadYourWritesTrans
return workerInterfacesImplGetRangeActor(ryw, getKeyRange().begin, kr);
}
ACTOR Future<Optional<Value>> getJSON(Database db, std::string jsonField = "");
ACTOR static Future<RangeResult> FaultToleranceMetricsImplActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
state RangeResult res;
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
Optional<Value> val = wait(getJSON(ryw->getDatabase(), "fault_tolerance"));
if (val.present()) {
res.push_back_deep(res.arena(), KeyValueRef(kr.begin, val.get()));
}
}
return res;
}
FaultToleranceMetricsImpl::FaultToleranceMetricsImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
Future<RangeResult> FaultToleranceMetricsImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
// single key range, the queried range should always be the same as the underlying range
ASSERT(kr == getKeyRange());
return FaultToleranceMetricsImplActor(ryw, kr);
}
ACTOR Future<Void> validateSpecialSubrangeRead(ReadYourWritesTransaction* ryw,
KeySelector begin,
KeySelector end,

View File

@ -418,8 +418,10 @@ ACTOR Future<StatusObject> clientStatusFetcher(Reference<IClusterConnectionRecor
}
// Cluster section of json output
ACTOR Future<Optional<StatusObject>> clusterStatusFetcher(ClusterInterface cI, StatusArray* messages) {
state StatusRequest req;
ACTOR Future<Optional<StatusObject>> clusterStatusFetcher(ClusterInterface cI,
StatusArray* messages,
std::string statusField) {
state StatusRequest req(statusField);
state Future<Void> clusterTimeout = delay(30.0);
state Optional<StatusObject> oStatusObj;
@ -506,7 +508,8 @@ StatusObject getClientDatabaseStatus(StatusObjectReader client, StatusObjectRead
}
ACTOR Future<StatusObject> statusFetcherImpl(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface) {
Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface,
std::string statusField) {
if (!g_network)
throw network_not_setup();
@ -548,7 +551,7 @@ ACTOR Future<StatusObject> statusFetcherImpl(Reference<IClusterConnectionRecord>
loop {
if (clusterInterface->get().present()) {
Optional<StatusObject> _statusObjCluster =
wait(clusterStatusFetcher(clusterInterface->get().get(), &clientMessages));
wait(clusterStatusFetcher(clusterInterface->get().get(), &clientMessages, statusField));
if (_statusObjCluster.present()) {
statusObjCluster = _statusObjCluster.get();
// TODO: this is a temporary fix, getting the number of available coordinators should move to
@ -622,12 +625,12 @@ ACTOR Future<Void> timeoutMonitorLeader(Database db) {
}
}
Future<StatusObject> StatusClient::statusFetcher(Database db) {
Future<StatusObject> StatusClient::statusFetcher(Database db, std::string statusField) {
db->lastStatusFetch = now();
if (!db->statusClusterInterface) {
db->statusClusterInterface = makeReference<AsyncVar<Optional<ClusterInterface>>>();
db->statusLeaderMon = timeoutMonitorLeader(db);
}
return statusFetcherImpl(db->getConnectionRecord(), db->statusClusterInterface);
return statusFetcherImpl(db->getConnectionRecord(), db->statusClusterInterface, statusField);
}

View File

@ -245,10 +245,14 @@ struct StatusReply {
struct StatusRequest {
constexpr static FileIdentifier file_identifier = 14419140;
ReplyPromise<struct StatusReply> reply;
std::string statusField;
StatusRequest() {}
explicit StatusRequest(std::string statusField) : statusField(statusField) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
serializer(ar, reply, statusField);
}
};

View File

@ -571,6 +571,14 @@ public:
GetRangeLimits limitsHint) const override;
};
class FaultToleranceMetricsImpl : public SpecialKeyRangeReadImpl {
public:
explicit FaultToleranceMetricsImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override;
};
// If the underlying set of key-value pairs of a key space is not changing, then we expect repeating a read to give the
// same result. Additionally, we can generate the expected result of any read if that read is reading a subrange. This
// actor performs a read of an arbitrary subrange of [begin, end) and validates the results.

View File

@ -28,7 +28,7 @@
class StatusClient {
public:
enum StatusLevel { MINIMAL = 0, NORMAL = 1, DETAILED = 2, JSON = 3 };
static Future<StatusObject> statusFetcher(Database db);
static Future<StatusObject> statusFetcher(Database db, std::string statusField = "");
};
#endif

View File

@ -1489,54 +1489,60 @@ ACTOR Future<Void> statusServer(FutureStream<StatusRequest> requests,
}
}
// Get status but trap errors to send back to client.
std::vector<WorkerDetails> workers;
std::vector<ProcessIssues> workerIssues;
for (auto& it : self->id_worker) {
workers.push_back(it.second.details);
if (it.second.issues.size()) {
workerIssues.emplace_back(it.second.details.interf.address(), it.second.issues);
}
}
std::vector<NetworkAddress> incompatibleConnections;
for (auto it = self->db.incompatibleConnections.begin(); it != self->db.incompatibleConnections.end();) {
if (it->second < now()) {
it = self->db.incompatibleConnections.erase(it);
} else {
incompatibleConnections.push_back(it->first);
it++;
}
}
state ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo,
self->cx,
workers,
workerIssues,
self->storageStatusInfos,
&self->db.clientStatus,
coordinators,
incompatibleConnections,
self->datacenterVersionDifference,
configBroadcaster,
self->db.metaclusterRegistration,
self->db.metaclusterMetrics)));
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
throw result.getError();
// Update last_request_time now because GetStatus is finished and the delay is to be measured between
// requests
last_request_time = now();
while (!requests_batch.empty()) {
if (result.isError())
requests_batch.back().reply.sendError(result.getError());
else
requests_batch.back().reply.send(result.get());
requests_batch.pop_back();
wait(yield());
state std::string requestedStatusJsonField = requests_batch.back().statusField;
// Get status but trap errors to send back to client.
std::vector<WorkerDetails> workers;
std::vector<ProcessIssues> workerIssues;
for (auto& it : self->id_worker) {
workers.push_back(it.second.details);
if (it.second.issues.size()) {
workerIssues.emplace_back(it.second.details.interf.address(), it.second.issues);
}
}
std::vector<NetworkAddress> incompatibleConnections;
for (auto it = self->db.incompatibleConnections.begin();
it != self->db.incompatibleConnections.end();) {
if (it->second < now()) {
it = self->db.incompatibleConnections.erase(it);
} else {
incompatibleConnections.push_back(it->first);
it++;
}
}
state ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo,
self->cx,
workers,
workerIssues,
self->storageStatusInfos,
&self->db.clientStatus,
coordinators,
incompatibleConnections,
self->datacenterVersionDifference,
configBroadcaster,
self->db.metaclusterRegistration,
self->db.metaclusterMetrics,
requestedStatusJsonField)));
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
throw result.getError();
// Update last_request_time now because GetStatus is finished and the delay is to be measured between
// requests
last_request_time = now();
while (!requests_batch.empty() && requests_batch.back().statusField == requestedStatusJsonField) {
if (result.isError())
requests_batch.back().reply.sendError(result.getError());
else
requests_batch.back().reply.send(result.get());
requests_batch.pop_back();
wait(yield());
}
}
} catch (Error& e) {
TraceEvent(SevError, "StatusServerError").error(e);

View File

@ -3032,7 +3032,13 @@ ACTOR Future<StatusReply> clusterGetStatus(
Version datacenterVersionDifference,
ConfigBroadcaster const* configBroadcaster,
Optional<UnversionedMetaclusterRegistrationEntry> metaclusterRegistration,
metacluster::MetaclusterMetrics metaclusterMetrics) {
metacluster::MetaclusterMetrics metaclusterMetrics,
std::string requestedStatusJsonField) {
if (requestedStatusJsonField == "fault_tolerance") {
StatusReply reply = wait(clusterGetFaultToleranceStatus(db, cx, workers, coordinators));
return reply;
}
state double tStart = timer();
state JsonBuilderArray messages;
@ -3676,6 +3682,133 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
}
ACTOR Future<StatusReply> clusterGetFaultToleranceStatus(Reference<AsyncVar<ServerDBInfo>> db,
Database cx,
std::vector<WorkerDetails> workers,
ServerCoordinators coordinators) {
state double tStart = timer();
state JsonBuilderArray messages;
state std::set<std::string> status_incomplete_reasons;
state WorkerDetails mWorker; // Master worker
state WorkerDetails ccWorker; // Cluster-Controller worker
state WorkerDetails ddWorker; // DataDistributor worker
try {
state JsonBuilderObject statusObj;
// Get the master Worker interface
Optional<WorkerDetails> _mWorker = getWorker(workers, db->get().master.address());
if (_mWorker.present()) {
mWorker = _mWorker.get();
} else {
messages.push_back(
JsonString::makeMessage("unreachable_master_worker", "Unable to locate the master worker."));
}
// Get the cluster-controller Worker interface
Optional<WorkerDetails> _ccWorker = getWorker(workers, db->get().clusterInterface.address());
if (_ccWorker.present()) {
ccWorker = _ccWorker.get();
} else {
messages.push_back(JsonString::makeMessage("unreachable_cluster_controller_worker",
"Unable to locate the cluster-controller worker."));
}
// Get the DataDistributor worker interface
Optional<WorkerDetails> _ddWorker;
if (db->get().distributor.present()) {
_ddWorker = getWorker(workers, db->get().distributor.get().address());
}
if (!db->get().distributor.present() || !_ddWorker.present()) {
messages.push_back(JsonString::makeMessage("unreachable_dataDistributor_worker",
"Unable to locate the data distributor worker."));
} else {
ddWorker = _ddWorker.get();
}
// construct status information for cluster subsections
state int statusCode = (int)RecoveryStatus::END;
state JsonBuilderObject recoveryStateStatus = wait(
recoveryStateStatusFetcher(cx, ccWorker, mWorker, workers.size(), &status_incomplete_reasons, &statusCode));
if (!recoveryStateStatus.empty())
statusObj["recovery_state"] = recoveryStateStatus;
state Optional<DatabaseConfiguration> configuration;
state Optional<LoadConfigurationResult> loadResult;
state std::unordered_map<NetworkAddress, WorkerInterface> address_workers;
if (statusCode != RecoveryStatus::configuration_missing) {
std::pair<Optional<DatabaseConfiguration>, Optional<LoadConfigurationResult>> loadResults =
wait(loadConfiguration(cx, &messages, &status_incomplete_reasons));
configuration = loadResults.first;
loadResult = loadResults.second;
}
if (loadResult.present()) {
statusObj["full_replication"] = loadResult.get().fullReplication;
if (loadResult.get().healthyZone.present()) {
if (loadResult.get().healthyZone.get() != ignoreSSFailuresZoneString) {
statusObj["maintenance_zone"] = loadResult.get().healthyZone.get().printable();
statusObj["maintenance_seconds_remaining"] = loadResult.get().healthyZoneSeconds;
} else {
statusObj["data_distribution_disabled_for_ss_failures"] = true;
}
}
}
if (configuration.present()) {
for (auto const& worker : workers) {
address_workers[worker.interf.address()] = worker.interf;
}
state int minStorageReplicasRemaining = -1;
JsonBuilderObject clusterDataSection =
wait(dataStatusFetcher(ddWorker, configuration.get(), &minStorageReplicasRemaining));
// If data section not empty, add it to statusObj
if (!clusterDataSection.empty())
statusObj["data"] = clusterDataSection;
int logFaultTolerance = 100;
if (db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
statusObj["logs"] = tlogFetcher(&logFaultTolerance, db, address_workers);
}
}
// Create the status_incomplete message if there were any reasons that the status is incomplete.
if (!status_incomplete_reasons.empty()) {
JsonBuilderObject incomplete_message =
JsonBuilder::makeMessage("status_incomplete", "Unable to retrieve all status information.");
// Make a JSON array of all of the reasons in the status_incomplete_reasons set.
JsonBuilderArray reasons;
for (auto i : status_incomplete_reasons) {
reasons.push_back(JsonBuilderObject().setKey("description", i));
}
incomplete_message["reasons"] = reasons;
messages.push_back(incomplete_message);
}
statusObj["messages"] = messages;
int64_t clusterTime = g_network->timer();
if (clusterTime != -1) {
statusObj["cluster_controller_timestamp"] = clusterTime;
}
TraceEvent("ClusterGetFaultToleranceStatus")
.detail("Duration", timer() - tStart)
.detail("StatusSize", statusObj.getFinalLength());
return StatusReply(statusObj.getJson());
} catch (Error& e) {
TraceEvent(SevError, "StatusError").error(e);
throw;
}
}
bool checkAsciiNumber(const char* s) {
JsonBuilderObject number;
number.setKeyRawNumber("number", s);

View File

@ -55,7 +55,13 @@ Future<StatusReply> clusterGetStatus(
Version const& datacenterVersionDifference,
ConfigBroadcaster const* const& conifgBroadcaster,
Optional<UnversionedMetaclusterRegistrationEntry> const& metaclusterRegistration,
metacluster::MetaclusterMetrics const& metaclusterMetrics);
metacluster::MetaclusterMetrics const& metaclusterMetrics,
std::string const& requestedStatusJsonField);
Future<StatusReply> clusterGetFaultToleranceStatus(Reference<AsyncVar<struct ServerDBInfo>> const& db,
Database const& cx,
std::vector<WorkerDetails> const& workers,
ServerCoordinators const& coordinators);
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
ACTOR Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(