refactor Status retrieval

This commit is contained in:
Xiaoxi Wang 2022-05-03 15:26:24 -07:00
parent 940512f208
commit 75a90be0dd
2 changed files with 71 additions and 64 deletions

View File

@ -33,6 +33,7 @@
#include "flow/FastRef.h"
#include "flow/ProtocolVersion.h"
#include "flow/flow.h"
#include "fdbclient/Status.h"
enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 };
@ -1473,6 +1474,14 @@ struct StorageMetadataType {
void serialize(Ar& ar) {
serializer(ar, createdTime, storeType);
}
StatusObject toJSON() const {
StatusObject result;
result["created_time_timestamp"] = createdTime;
result["created_time_datetime"] = epochsToGMTString(createdTime);
result["storage_engine"] = storeType.toString();
return result;
}
};
// store metadata of wiggle action

View File

@ -103,6 +103,14 @@ extern const char* limitReasonDesc[];
typedef std::map<std::string, TraceEventFields> EventMap;
struct StorageServerStatusInfo : public StorageServerInterface {
Optional<StorageMetadataType> metadata;
EventMap eventMap;
StorageServerStatusInfo(const StorageServerInterface& interface,
Optional<StorageMetadataType> metadata = Optional<StorageMetadataType>())
: StorageServerInterface(interface), metadata(metadata) {}
};
ACTOR static Future<Optional<TraceEventFields>> latestEventOnWorker(WorkerInterface worker, std::string eventName) {
try {
EventLogRequest req =
@ -468,12 +476,13 @@ struct RolesInfo {
obj["role"] = role;
return roles.insert(std::make_pair(address, obj))->second;
}
JsonBuilderObject& addRole(std::string const& role,
StorageServerInterface& iface,
EventMap const& metrics,
StorageServerStatusInfo& iface,
Version maxTLogVersion,
double* pDataLagSeconds) {
JsonBuilderObject obj;
EventMap const& metrics = iface.eventMap;
double dataLagSeconds = -1.0;
obj["id"] = iface.id().shortString();
obj["role"] = role;
@ -584,13 +593,8 @@ struct RolesInfo {
}
}
if (!iface.isTss()) { // only storage server has Metadata field
TraceEventFields const& metadata = metrics.at("Metadata");
JsonBuilderObject metadataObj;
metadataObj["created_time_datetime"] = metadata.getValue("CreatedTimeDatetime");
metadataObj["created_time_timestamp"] = metadata.getDouble("CreatedTimeTimestamp");
metadataObj["storage_engine"] = metadata.getValue("StoreType");
obj["storage_metadata"] = metadataObj;
if (iface.metadata.present()) {
obj["storage_metadata"] = iface.metadata.get().toJSON();
// printf("%s\n", metadataObj.getJson().c_str());
}
@ -731,7 +735,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
WorkerEvents traceFileOpenErrors,
WorkerEvents programStarts,
std::map<std::string, std::vector<JsonBuilderObject>> processIssues,
std::vector<std::pair<StorageServerInterface, EventMap>> storageServers,
std::vector<StorageServerStatusInfo> storageServers,
std::vector<std::pair<TLogInterface, EventMap>> tLogs,
std::vector<std::pair<CommitProxyInterface, EventMap>> commitProxies,
std::vector<std::pair<GrvProxyInterface, EventMap>> grvProxies,
@ -861,13 +865,13 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
wait(yield());
}
state std::vector<std::pair<StorageServerInterface, EventMap>>::iterator ss;
state std::vector<StorageServerStatusInfo>::iterator ss;
state std::map<NetworkAddress, double> ssLag;
state double lagSeconds;
for (ss = storageServers.begin(); ss != storageServers.end(); ++ss) {
roles.addRole("storage", ss->first, ss->second, maxTLogVersion, &lagSeconds);
roles.addRole("storage", *ss, maxTLogVersion, &lagSeconds);
if (lagSeconds != -1.0) {
ssLag[ss->first.address()] = lagSeconds;
ssLag[ss->address()] = lagSeconds;
}
wait(yield());
}
@ -1919,74 +1923,69 @@ static Future<std::vector<TraceEventFields>> getServerBusiestWriteTags(
}
ACTOR
static Future<std::vector<Optional<StorageMetadataType>>> getServerMetadata(std::vector<StorageServerInterface> servers,
Database cx,
bool use_system_priority) {
static Future<std::vector<StorageServerStatusInfo>> readStorageInterfaceAndMetadata(Database cx,
bool use_system_priority) {
state KeyBackedObjectMap<UID, StorageMetadataType, decltype(IncludeVersion())> metadataMap(serverMetadataKeys.begin,
IncludeVersion());
state std::vector<Optional<StorageMetadataType>> res(servers.size());
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state std::vector<StorageServerStatusInfo> servers;
loop {
try {
servers.clear();
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
if (use_system_priority) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
state int i = 0;
for (i = 0; i < servers.size(); ++i) {
Optional<StorageMetadataType> metadata = wait(metadataMap.get(tr, servers[i].id(), Snapshot::True));
// TraceEvent(SevDebug, "MetadataAppear", servers[i].id()).detail("Present", metadata.present());
res[i] = metadata;
state RangeResult serverList = wait(tr->getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
servers.reserve(serverList.size());
for (int i = 0; i < serverList.size(); i++) {
servers.push_back(StorageServerStatusInfo(decodeServerListValue(serverList[i].value)));
}
state std::vector<Future<Void>> futures(servers.size());
for (int i = 0; i < servers.size(); ++i) {
auto& info = servers[i];
futures[i] = fmap(
[&info](Optional<StorageMetadataType> meta) -> Void {
info.metadata = meta;
return Void();
},
metadataMap.get(tr, servers[i].id()));
// TraceEvent(SevDebug, "MetadataAppear", servers[i].id()).detail("Present", metadata.present());
}
wait(waitForAll(futures));
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
return res;
return servers;
}
ACTOR static Future<std::vector<std::pair<StorageServerInterface, EventMap>>> getStorageServersAndMetrics(
ACTOR static Future<std::vector<StorageServerStatusInfo>> getStorageServerStatusInfos(
Database cx,
std::unordered_map<NetworkAddress, WorkerInterface> address_workers,
WorkerDetails rkWorker) {
state std::vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
state std::vector<std::pair<StorageServerInterface, EventMap>> results;
state std::vector<StorageServerStatusInfo> servers =
wait(timeoutError(readStorageInterfaceAndMetadata(cx, true), 5.0));
state std::vector<std::pair<StorageServerStatusInfo, EventMap>> results;
state std::vector<TraceEventFields> busiestWriteTags;
state std::vector<Optional<StorageMetadataType>> metadata;
wait(store(results,
getServerMetrics(servers,
address_workers,
std::vector<std::string>{
"StorageMetrics", "ReadLatencyMetrics", "ReadLatencyBands", "BusiestReadTag" })) &&
store(busiestWriteTags, getServerBusiestWriteTags(servers, address_workers, rkWorker)) &&
store(metadata, getServerMetadata(servers, cx, true)));
store(busiestWriteTags, getServerBusiestWriteTags(servers, address_workers, rkWorker)));
ASSERT(busiestWriteTags.size() == results.size() && metadata.size() == results.size());
ASSERT(busiestWriteTags.size() == results.size());
for (int i = 0; i < results.size(); ++i) {
results[i].second.emplace("BusiestWriteTag", busiestWriteTags[i]);
// FIXME: it's possible that a SS is removed between `getStorageServers` and `getServerMetadata`. Maybe we can
// read StorageServer and Metadata in an atomic transaction?
if (metadata[i].present()) {
TraceEventFields metadataField;
metadataField.addField("CreatedTimeTimestamp", std::to_string(metadata[i].get().createdTime));
metadataField.addField("CreatedTimeDatetime", epochsToGMTString(metadata[i].get().createdTime));
metadataField.addField("StoreType", metadata[i].get().storeType.toString());
results[i].second.emplace("Metadata", metadataField);
} else if (!servers[i].isTss()) {
TraceEventFields metadataField;
metadataField.addField("CreatedTimeTimestamp", "0");
metadataField.addField("CreatedTimeDatetime", "[removed]");
metadataField.addField("StoreType", KeyValueStoreType::getStoreTypeStr(KeyValueStoreType::END));
results[i].second.emplace("Metadata", metadataField);
}
servers[i].eventMap = std::move(results[i].second);
servers[i].eventMap.emplace("BusiestWriteTag", busiestWriteTags[i]);
}
return results;
return servers;
}
ACTOR static Future<std::vector<std::pair<TLogInterface, EventMap>>> getTLogsAndMetrics(
@ -2103,7 +2102,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(
JsonBuilderObject* qos,
JsonBuilderObject* data_overlay,
std::set<std::string>* incomplete_reasons,
Future<ErrorOr<std::vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture) {
Future<ErrorOr<std::vector<StorageServerStatusInfo>>> storageServerFuture) {
state JsonBuilderObject statusObj;
state JsonBuilderObject operationsObj;
state JsonBuilderObject bytesObj;
@ -2275,7 +2274,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(
// Reads
try {
ErrorOr<std::vector<std::pair<StorageServerInterface, EventMap>>> storageServers = wait(storageServerFuture);
ErrorOr<std::vector<StorageServerStatusInfo>> storageServers = wait(storageServerFuture);
if (!storageServers.present()) {
throw storageServers.getError();
}
@ -2287,7 +2286,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(
StatusCounter lowPriorityReads;
for (auto& ss : storageServers.get()) {
TraceEventFields const& storageMetrics = ss.second.at("StorageMetrics");
TraceEventFields const& storageMetrics = ss.eventMap.at("StorageMetrics");
if (storageMetrics.size() > 0) {
readRequests.updateValues(StatusCounter(storageMetrics.getValue("QueryQueue")));
@ -2318,14 +2317,14 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(
ACTOR static Future<JsonBuilderObject> clusterSummaryStatisticsFetcher(
WorkerEvents pMetrics,
Future<ErrorOr<std::vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture,
Future<ErrorOr<std::vector<StorageServerStatusInfo>>> storageServerFuture,
Future<ErrorOr<std::vector<std::pair<TLogInterface, EventMap>>>> tlogFuture,
std::set<std::string>* incomplete_reasons) {
state JsonBuilderObject statusObj;
try {
state JsonBuilderObject cacheStatistics;
ErrorOr<std::vector<std::pair<StorageServerInterface, EventMap>>> storageServers = wait(storageServerFuture);
ErrorOr<std::vector<StorageServerStatusInfo>> storageServers = wait(storageServerFuture);
if (!storageServers.present()) {
throw storageServers.getError();
@ -2335,7 +2334,7 @@ ACTOR static Future<JsonBuilderObject> clusterSummaryStatisticsFetcher(
double storageCacheMissesHz = 0;
for (auto& ss : storageServers.get()) {
auto processMetrics = pMetrics.find(ss.first.address());
auto processMetrics = pMetrics.find(ss.address());
if (processMetrics != pMetrics.end()) {
int64_t hits = processMetrics->second.getInt64("CacheHits");
int64_t misses = processMetrics->second.getInt64("CacheMisses");
@ -2947,7 +2946,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state std::map<std::string, std::vector<JsonBuilderObject>> processIssues =
getProcessIssuesAsMessages(workerIssues);
state std::vector<std::pair<StorageServerInterface, EventMap>> storageServers;
state std::vector<StorageServerStatusInfo> storageServers;
state std::vector<std::pair<TLogInterface, EventMap>> tLogs;
state std::vector<std::pair<CommitProxyInterface, EventMap>> commitProxies;
state std::vector<std::pair<GrvProxyInterface, EventMap>> grvProxies;
@ -3021,8 +3020,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
address_workers[worker.interf.address()] = worker.interf;
}
state Future<ErrorOr<std::vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture =
errorOr(getStorageServersAndMetrics(cx, address_workers, rkWorker));
state Future<ErrorOr<std::vector<StorageServerStatusInfo>>> storageServerFuture =
errorOr(getStorageServerStatusInfos(cx, address_workers, rkWorker));
state Future<ErrorOr<std::vector<std::pair<TLogInterface, EventMap>>>> tLogFuture =
errorOr(getTLogsAndMetrics(db, address_workers));
state Future<ErrorOr<std::vector<std::pair<CommitProxyInterface, EventMap>>>> commitProxyFuture =
@ -3136,8 +3135,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// Need storage servers now for processStatusFetcher() below.
ErrorOr<std::vector<std::pair<StorageServerInterface, EventMap>>> _storageServers =
wait(storageServerFuture);
ErrorOr<std::vector<StorageServerStatusInfo>> _storageServers = wait(storageServerFuture);
if (_storageServers.present()) {
storageServers = _storageServers.get();
} else {
@ -3225,11 +3223,11 @@ ACTOR Future<StatusReply> clusterGetStatus(
int activeTSSCount = 0;
JsonBuilderArray wiggleServerAddress;
for (auto& it : storageServers) {
if (it.first.isTss()) {
if (it.isTss()) {
activeTSSCount++;
}
if (wiggleServers.count(it.first.id())) {
wiggleServerAddress.push_back(it.first.address().toString());
if (wiggleServers.count(it.id())) {
wiggleServerAddress.push_back(it.address().toString());
}
}
statusObj["active_tss_count"] = activeTSSCount;