Reworked the way latency counters are tracked. Report the latency bands in separate events from StorageMetrics and ProxyMetrics. Fix a problem when the latency band configuration was changed. Add correctness testing.

This commit is contained in:
A.J. Beamon 2019-02-07 13:39:22 -08:00
parent 4925b61d0e
commit d4349293b9
7 changed files with 210 additions and 154 deletions

View File

@ -96,7 +96,7 @@ struct CommitTransactionRequest : TimedRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, *(TimedRequest*)this, transaction, reply, arena, flags, debugID);
serializer(ar, transaction, reply, arena, flags, debugID);
}
};

View File

@ -126,25 +126,13 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"roughness":0.0
},
"grv_latency_bands":{
"0.01": {
"hz":0.0,
"counter":0,
"roughness":0.0
}
"$map": 1
},
"read_latency_bands":{
"0.01": {
"hz":0.0,
"counter":0,
"roughness":0.0
}
"$map": 1
},
"commit_latency_bands":{
"0.01": {
"hz":0.0,
"counter":0,
"roughness":0.0
}
"$map": 1
}
}
],

View File

@ -65,8 +65,8 @@ struct ProxyStats {
: cc("ProxyStats", id.toString()),
txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc), txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc), txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc), txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc), txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), lastCommitVersionAssigned(0), commitLatencyBands("CommitLatency", cc),
grvLatencyBands("GRVLatency", cc)
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), lastCommitVersionAssigned(0),
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
{
specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
specialCounter(cc, "Version", [pVersion](){return *pVersion; });
@ -1069,7 +1069,7 @@ ACTOR Future<Void> fetchVersions(ProxyCommitData *commitData) {
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests, ProxyStats *stats) {
GetReadVersionReply reply = wait(replyFuture);
double end = timer();
for(GetReadVersionRequest request : requests) {
for(GetReadVersionRequest const& request : requests) {
stats->grvLatencyBands.addMeasurement(end - request.requestTime);
request.reply.send(reply);
}

View File

@ -81,6 +81,7 @@ extern const char* limitReasonName[];
extern const char* limitReasonDesc[];
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
typedef std::map<std::string, TraceEventFields> EventMap;
ACTOR static Future< Optional<TraceEventFields> > latestEventOnWorker(WorkerInterface worker, std::string eventName) {
try {
@ -303,7 +304,7 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
std::map<std::string, int32_t> workerContribMap;
std::map<std::string, JsonBuilderObject> machineJsonMap;
for (auto worker : workers){
for (auto const& worker : workers){
locality[worker.first.address()] = worker.first.locality;
if (worker.first.locality.dcId().present())
dcIds[worker.first.address()] = worker.first.locality.dcId().get().printable();
@ -407,62 +408,65 @@ struct MachineMemoryInfo {
struct RolesInfo {
std::multimap<NetworkAddress, JsonBuilderObject> roles;
JsonBuilderObject addLatencyBandInfo(TraceEventFields const& metrics) {
JsonBuilderObject latency;
std::map<std::string, JsonBuilderObject> bands;
for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) {
std::string band;
if(itr->first.substr(0, 4) == "Band") {
band = itr->first.substr(4);
}
else if(itr->first == "Filtered") {
band = "filtered";
}
else {
continue;
}
latency[band] = StatusCounter(itr->second).getCounter();
}
return latency;
}
JsonBuilderObject& addRole( NetworkAddress address, std::string const& role, UID id) {
JsonBuilderObject obj;
obj["id"] = id.shortString();
obj["role"] = role;
return roles.insert( std::make_pair(address, obj ))->second;
}
JsonBuilderObject& addRole(std::string const& role, StorageServerInterface& iface, TraceEventFields const& metrics, Version maxTLogVersion, double* pDataLagSeconds) {
JsonBuilderObject& addRole(std::string const& role, StorageServerInterface& iface, EventMap const& metrics, Version maxTLogVersion, double* pDataLagSeconds) {
JsonBuilderObject obj;
double dataLagSeconds = -1.0;
obj["id"] = iface.id().shortString();
obj["role"] = role;
try {
obj.setKeyRawNumber("stored_bytes", metrics.getValue("BytesStored"));
obj.setKeyRawNumber("kvstore_used_bytes", metrics.getValue("KvstoreBytesUsed"));
obj.setKeyRawNumber("kvstore_free_bytes", metrics.getValue("KvstoreBytesFree"));
obj.setKeyRawNumber("kvstore_available_bytes", metrics.getValue("KvstoreBytesAvailable"));
obj.setKeyRawNumber("kvstore_total_bytes", metrics.getValue("KvstoreBytesTotal"));
obj["input_bytes"] = StatusCounter(metrics.getValue("BytesInput")).getStatus();
obj["durable_bytes"] = StatusCounter(metrics.getValue("BytesDurable")).getStatus();
obj.setKeyRawNumber("query_queue_max", metrics.getValue("QueryQueueMax"));
obj["total_queries"] = StatusCounter(metrics.getValue("QueryQueue")).getStatus();
obj["finished_queries"] = StatusCounter(metrics.getValue("FinishedQueries")).getStatus();
obj["bytes_queried"] = StatusCounter(metrics.getValue("BytesQueried")).getStatus();
obj["keys_queried"] = StatusCounter(metrics.getValue("RowsQueried")).getStatus();
obj["mutation_bytes"] = StatusCounter(metrics.getValue("MutationBytes")).getStatus();
obj["mutations"] = StatusCounter(metrics.getValue("Mutations")).getStatus();
TraceEventFields const& storageMetrics = metrics.at("StorageMetrics");
std::string latencyBandPrefix = "ReadLatency";
obj.setKeyRawNumber("stored_bytes", storageMetrics.getValue("BytesStored"));
obj.setKeyRawNumber("kvstore_used_bytes", storageMetrics.getValue("KvstoreBytesUsed"));
obj.setKeyRawNumber("kvstore_free_bytes", storageMetrics.getValue("KvstoreBytesFree"));
obj.setKeyRawNumber("kvstore_available_bytes", storageMetrics.getValue("KvstoreBytesAvailable"));
obj.setKeyRawNumber("kvstore_total_bytes", storageMetrics.getValue("KvstoreBytesTotal"));
obj["input_bytes"] = StatusCounter(storageMetrics.getValue("BytesInput")).getStatus();
obj["durable_bytes"] = StatusCounter(storageMetrics.getValue("BytesDurable")).getStatus();
obj.setKeyRawNumber("query_queue_max", storageMetrics.getValue("QueryQueueMax"));
obj["total_queries"] = StatusCounter(storageMetrics.getValue("QueryQueue")).getStatus();
obj["finished_queries"] = StatusCounter(storageMetrics.getValue("FinishedQueries")).getStatus();
obj["bytes_queried"] = StatusCounter(storageMetrics.getValue("BytesQueried")).getStatus();
obj["keys_queried"] = StatusCounter(storageMetrics.getValue("RowsQueried")).getStatus();
obj["mutation_bytes"] = StatusCounter(storageMetrics.getValue("MutationBytes")).getStatus();
obj["mutations"] = StatusCounter(storageMetrics.getValue("Mutations")).getStatus();
JsonBuilderObject latency;
std::map<std::string, JsonBuilderObject> bands;
bool found = false;
for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) {
if(itr->first.substr(0, latencyBandPrefix.size()) == latencyBandPrefix) {
found = true;
std::string band = itr->first.substr(latencyBandPrefix.size());
latency[band] = StatusCounter(itr->second).getCounter();
}
std::string value;
if(metrics.tryGetValue("Filtered" + latencyBandPrefix, value)) {
latency["filtered"] = StatusCounter(value).getCounter();
}
}
if(found) {
obj["read_latency_bands"] = latency;
}
Version version = parseInt64(metrics.getValue("Version"));
Version durableVersion = parseInt64(metrics.getValue("DurableVersion"));
Version version = parseInt64(storageMetrics.getValue("Version"));
Version durableVersion = parseInt64(storageMetrics.getValue("DurableVersion"));
obj["data_version"] = version;
obj["durable_version"] = durableVersion;
int64_t versionLag = parseInt64(metrics.getValue("VersionLag"));
int64_t versionLag = parseInt64(storageMetrics.getValue("VersionLag"));
if(maxTLogVersion > 0) {
// It's possible that the storage server hasn't talked to the logs recently, in which case it may not be aware of how far behind it is.
// To account for that, we also compute the version difference between each storage server and the tlog with the largest version.
@ -472,6 +476,11 @@ struct RolesInfo {
versionLag = std::max<int64_t>(versionLag, maxTLogVersion - version - SERVER_KNOBS->STORAGE_LOGGING_DELAY * SERVER_KNOBS->VERSIONS_PER_SECOND);
}
TraceEventFields const& readLatencyMetrics = metrics.at("ReadLatencyMetrics");
if(readLatencyMetrics.size()) {
obj["read_latency_bands"] = addLatencyBandInfo(readLatencyMetrics);
}
JsonBuilderObject dataLag;
dataLag["versions"] = versionLag;
dataLagSeconds = versionLag / (double)SERVER_KNOBS->VERSIONS_PER_SECOND;
@ -495,23 +504,25 @@ struct RolesInfo {
return roles.insert( std::make_pair(iface.address(), obj ))->second;
}
JsonBuilderObject& addRole(std::string const& role, TLogInterface& iface, TraceEventFields const& metrics, Version* pMetricVersion) {
JsonBuilderObject& addRole(std::string const& role, TLogInterface& iface, EventMap const& metrics, Version* pMetricVersion) {
JsonBuilderObject obj;
Version metricVersion = 0;
obj["id"] = iface.id().shortString();
obj["role"] = role;
try {
obj.setKeyRawNumber("kvstore_used_bytes",metrics.getValue("KvstoreBytesUsed"));
obj.setKeyRawNumber("kvstore_free_bytes",metrics.getValue("KvstoreBytesFree"));
obj.setKeyRawNumber("kvstore_available_bytes",metrics.getValue("KvstoreBytesAvailable"));
obj.setKeyRawNumber("kvstore_total_bytes",metrics.getValue("KvstoreBytesTotal"));
obj.setKeyRawNumber("queue_disk_used_bytes",metrics.getValue("QueueDiskBytesUsed"));
obj.setKeyRawNumber("queue_disk_free_bytes",metrics.getValue("QueueDiskBytesFree"));
obj.setKeyRawNumber("queue_disk_available_bytes",metrics.getValue("QueueDiskBytesAvailable"));
obj.setKeyRawNumber("queue_disk_total_bytes",metrics.getValue("QueueDiskBytesTotal"));
obj["input_bytes"] = StatusCounter(metrics.getValue("BytesInput")).getStatus();
obj["durable_bytes"] = StatusCounter(metrics.getValue("BytesDurable")).getStatus();
metricVersion = parseInt64(metrics.getValue("Version"));
TraceEventFields const& tlogMetrics = metrics.at("TLogMetrics");
obj.setKeyRawNumber("kvstore_used_bytes", tlogMetrics.getValue("KvstoreBytesUsed"));
obj.setKeyRawNumber("kvstore_free_bytes", tlogMetrics.getValue("KvstoreBytesFree"));
obj.setKeyRawNumber("kvstore_available_bytes", tlogMetrics.getValue("KvstoreBytesAvailable"));
obj.setKeyRawNumber("kvstore_total_bytes", tlogMetrics.getValue("KvstoreBytesTotal"));
obj.setKeyRawNumber("queue_disk_used_bytes", tlogMetrics.getValue("QueueDiskBytesUsed"));
obj.setKeyRawNumber("queue_disk_free_bytes", tlogMetrics.getValue("QueueDiskBytesFree"));
obj.setKeyRawNumber("queue_disk_available_bytes", tlogMetrics.getValue("QueueDiskBytesAvailable"));
obj.setKeyRawNumber("queue_disk_total_bytes", tlogMetrics.getValue("QueueDiskBytesTotal"));
obj["input_bytes"] = StatusCounter(tlogMetrics.getValue("BytesInput")).getStatus();
obj["durable_bytes"] = StatusCounter(tlogMetrics.getValue("BytesDurable")).getStatus();
metricVersion = parseInt64(tlogMetrics.getValue("Version"));
obj["data_version"] = metricVersion;
} catch (Error& e) {
if(e.code() != error_code_attribute_not_found)
@ -521,47 +532,19 @@ struct RolesInfo {
*pMetricVersion = metricVersion;
return roles.insert( std::make_pair(iface.address(), obj ))->second;
}
JsonBuilderObject& addRole(std::string const& role, MasterProxyInterface& iface, TraceEventFields const& metrics) {
JsonBuilderObject& addRole(std::string const& role, MasterProxyInterface& iface, EventMap const& metrics) {
JsonBuilderObject obj;
obj["id"] = iface.id().shortString();
obj["role"] = role;
try {
std::string grvPrefix = "GRVLatency";
std::string commitPrefix = "CommitLatency";
JsonBuilderObject grvLatency;
JsonBuilderObject commitLatency;
bool grvFound = false;
bool commitFound = false;
for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) {
if(itr->first.substr(0, grvPrefix.size()) == grvPrefix) {
grvFound = true;
std::string band = itr->first.substr(grvPrefix.size());
grvLatency[band] = StatusCounter(itr->second).getCounter();
}
else if(itr->first.substr(0, commitPrefix.size()) == commitPrefix) {
commitFound = true;
std::string band = itr->first.substr(commitPrefix.size());
commitLatency[band] = StatusCounter(itr->second).getCounter();
}
TraceEventFields const& grvLatencyMetrics = metrics.at("GRVLatencyMetrics");
if(grvLatencyMetrics.size()) {
obj["grv_latency_bands"] = addLatencyBandInfo(grvLatencyMetrics);
}
if(grvFound) {
std::string value;
if(metrics.tryGetValue("Filtered" + grvPrefix, value)) {
grvLatency["filtered"] = StatusCounter(value).getCounter();
}
obj["grv_latency_bands"] = grvLatency;
}
if(commitFound) {
std::string value;
if(metrics.tryGetValue("Filtered" + commitPrefix, value)) {
commitLatency["filtered"] = StatusCounter(value).getCounter();
}
obj["commit_latency_bands"] = commitLatency;
TraceEventFields const& commitLatencyMetrics = metrics.at("CommitLatencyMetrics");
if(commitLatencyMetrics.size()) {
obj["commit_latency_bands"] = addLatencyBandInfo(commitLatencyMetrics);
}
} catch (Error &e) {
if(e.code() != error_code_attribute_not_found) {
@ -595,9 +578,9 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
WorkerEvents traceFileOpenErrors,
WorkerEvents programStarts,
std::map<std::string, JsonBuilderObject> processIssues,
vector<std::pair<StorageServerInterface, TraceEventFields>> storageServers,
vector<std::pair<TLogInterface, TraceEventFields>> tLogs,
vector<std::pair<MasterProxyInterface, TraceEventFields>> proxies,
vector<std::pair<StorageServerInterface, EventMap>> storageServers,
vector<std::pair<TLogInterface, EventMap>> tLogs,
vector<std::pair<MasterProxyInterface, EventMap>> proxies,
Database cx,
Optional<DatabaseConfiguration> configuration,
std::set<std::string> *incomplete_reasons) {
@ -656,13 +639,13 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
roles.addRole("master", db->get().master);
roles.addRole("cluster_controller", db->get().clusterInterface.clientInterface);
state std::vector<std::pair<MasterProxyInterface, TraceEventFields>>::iterator proxy;
state std::vector<std::pair<MasterProxyInterface, EventMap>>::iterator proxy;
for(proxy = proxies.begin(); proxy != proxies.end(); ++proxy) {
roles.addRole( "proxy", proxy->first, proxy->second );
wait(yield());
}
state std::vector<std::pair<TLogInterface, TraceEventFields>>::iterator log;
state std::vector<std::pair<TLogInterface, EventMap>>::iterator log;
state Version maxTLogVersion = 0;
// Get largest TLog version
@ -673,7 +656,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
wait(yield());
}
state std::vector<std::pair<StorageServerInterface, TraceEventFields>>::iterator ss;
state std::vector<std::pair<StorageServerInterface, EventMap>>::iterator ss;
state std::map<NetworkAddress, double> ssLag;
state double lagSeconds;
for(ss = storageServers.begin(); ss != storageServers.end(); ++ss) {
@ -1303,34 +1286,50 @@ namespace std
}
ACTOR template <class iface>
static Future<vector<std::pair<iface, TraceEventFields>>> getServerMetrics(vector<iface> servers, std::unordered_map<NetworkAddress, WorkerInterface> address_workers, std::string eventName, bool useId) {
static Future<vector<std::pair<iface, EventMap>>> getServerMetrics(vector<iface> servers, std::unordered_map<NetworkAddress, WorkerInterface> address_workers, std::vector<std::string> eventNames) {
state vector<Future<Optional<TraceEventFields>>> futures;
for (auto s : servers) {
futures.push_back(latestEventOnWorker(address_workers[s.address()], (useId ? s.id().toString() + "/" + eventName : eventName)));
for (auto name : eventNames) {
futures.push_back(latestEventOnWorker(address_workers[s.address()], s.id().toString() + "/" + name));
}
}
wait(waitForAll(futures));
vector<std::pair<iface, TraceEventFields>> results;
vector<std::pair<iface, EventMap>> results;
auto futureItr = futures.begin();
for (int i = 0; i < servers.size(); i++) {
results.push_back(std::make_pair(servers[i], futures[i].get().present() ? futures[i].get().get() : TraceEventFields()));
EventMap serverResults;
for (auto name : eventNames) {
ASSERT(futureItr != futures.end());
serverResults[name] = futureItr->get().present() ? futureItr->get().get() : TraceEventFields();
++futureItr;
}
results.push_back(std::make_pair(servers[i], serverResults));
}
return results;
}
ACTOR static Future<vector<std::pair<StorageServerInterface, TraceEventFields>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
ACTOR static Future<vector<std::pair<StorageServerInterface, EventMap>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
vector<std::pair<StorageServerInterface, TraceEventFields>> results = wait(getServerMetrics(servers, address_workers, "StorageMetrics", true));
vector<std::pair<StorageServerInterface, EventMap>> results = wait(getServerMetrics(servers, address_workers,
std::vector<std::string>{ "StorageMetrics", "ReadLatencyMetrics" }));
return results;
}
ACTOR static Future<vector<std::pair<TLogInterface, TraceEventFields>>> getTLogsAndMetrics(Reference<AsyncVar<struct ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
ACTOR static Future<vector<std::pair<TLogInterface, EventMap>>> getTLogsAndMetrics(Reference<AsyncVar<struct ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<TLogInterface> servers = db->get().logSystemConfig.allPresentLogs();
vector<std::pair<TLogInterface, TraceEventFields>> results = wait(getServerMetrics(servers, address_workers, "TLogMetrics", true));
vector<std::pair<TLogInterface, EventMap>> results = wait(getServerMetrics(servers, address_workers,
std::vector<std::string>{ "TLogMetrics" }));
return results;
}
ACTOR static Future<vector<std::pair<MasterProxyInterface, TraceEventFields>>> getProxiesAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
ACTOR static Future<vector<std::pair<MasterProxyInterface, EventMap>>> getProxiesAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
Reference<ProxyInfo> proxyInfo = cx->getMasterProxies();
std::vector<MasterProxyInterface> servers;
if(proxyInfo) {
@ -1339,14 +1338,16 @@ ACTOR static Future<vector<std::pair<MasterProxyInterface, TraceEventFields>>> g
}
}
vector<std::pair<MasterProxyInterface, TraceEventFields>> results = wait(getServerMetrics(servers, address_workers, "ProxyMetrics", false));
vector<std::pair<MasterProxyInterface, EventMap>> results = wait(getServerMetrics(servers, address_workers,
std::vector<std::string>{ "GRVLatencyMetrics", "CommitLatencyMetrics" }));
return results;
}
static int getExtraTLogEligibleMachines(vector<std::pair<WorkerInterface, ProcessClass>> workers, DatabaseConfiguration configuration) {
std::set<StringRef> allMachines;
std::map<Key,std::set<StringRef>> dcId_machine;
for(auto worker : workers) {
for(auto const& worker : workers) {
if(worker.second.machineClassFitness(ProcessClass::TLog) < ProcessClass::NeverAssign
&& !configuration.isExcludedServer(worker.first.address()))
{
@ -1383,7 +1384,7 @@ static int getExtraTLogEligibleMachines(vector<std::pair<WorkerInterface, Proces
}
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>>> storageServerFuture)
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{
state JsonBuilderObject statusObj;
state JsonBuilderObject operationsObj;
@ -1394,7 +1395,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
try {
vector<Future<TraceEventFields>> proxyStatFutures;
std::map<NetworkAddress, std::pair<WorkerInterface, ProcessClass>> workersMap;
for (auto w : workers) {
for (auto const& w : workers) {
workersMap[w.first.address()] = w;
}
for (auto &p : db->get().client.proxies) {
@ -1486,7 +1487,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
// Reads
try {
ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>> storageServers = wait(storageServerFuture);
ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>> storageServers = wait(storageServerFuture);
if(!storageServers.present()) {
throw storageServers.getError();
}
@ -1497,10 +1498,12 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
StatusCounter readBytes;
for(auto &ss : storageServers.get()) {
readRequests.updateValues( StatusCounter(ss.second.getValue("QueryQueue")));
reads.updateValues( StatusCounter(ss.second.getValue("FinishedQueries")));
readKeys.updateValues( StatusCounter(ss.second.getValue("RowsQueried")));
readBytes.updateValues( StatusCounter(ss.second.getValue("BytesQueried")));
TraceEventFields const& storageMetrics = ss.second.at("StorageMetrics");
readRequests.updateValues( StatusCounter(storageMetrics.getValue("QueryQueue")));
reads.updateValues( StatusCounter(storageMetrics.getValue("FinishedQueries")));
readKeys.updateValues( StatusCounter(storageMetrics.getValue("RowsQueried")));
readBytes.updateValues( StatusCounter(storageMetrics.getValue("BytesQueried")));
}
operationsObj["read_requests"] = readRequests.getStatus();
@ -1877,9 +1880,9 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
state std::map<std::string, JsonBuilderObject> processIssues = getProcessIssuesAsMessages(workerIssues);
state vector<std::pair<StorageServerInterface, TraceEventFields>> storageServers;
state vector<std::pair<TLogInterface, TraceEventFields>> tLogs;
state vector<std::pair<MasterProxyInterface, TraceEventFields>> proxies;
state vector<std::pair<StorageServerInterface, EventMap>> storageServers;
state vector<std::pair<TLogInterface, EventMap>> tLogs;
state vector<std::pair<MasterProxyInterface, EventMap>> proxies;
state JsonBuilderObject qos;
state JsonBuilderObject data_overlay;
@ -1914,13 +1917,13 @@ ACTOR Future<StatusReply> clusterGetStatus(
// Start getting storage servers now (using system priority) concurrently. Using sys priority because having storage servers
// in status output is important to give context to error messages in status that reference a storage server role ID.
state std::unordered_map<NetworkAddress, WorkerInterface> address_workers;
for (auto worker : workers) {
for (auto const& worker : workers) {
address_workers[worker.first.address()] = worker.first;
}
state Future<ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers));
state Future<ErrorOr<vector<std::pair<TLogInterface, TraceEventFields>>>> tLogFuture = errorOr(getTLogsAndMetrics(db, address_workers));
state Future<ErrorOr<vector<std::pair<MasterProxyInterface, TraceEventFields>>>> proxyFuture = errorOr(getProxiesAndMetrics(cx, address_workers));
state Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers));
state Future<ErrorOr<vector<std::pair<TLogInterface, EventMap>>>> tLogFuture = errorOr(getTLogsAndMetrics(db, address_workers));
state Future<ErrorOr<vector<std::pair<MasterProxyInterface, EventMap>>>> proxyFuture = errorOr(getProxiesAndMetrics(cx, address_workers));
state int minReplicasRemaining = -1;
std::vector<Future<JsonBuilderObject>> futures2;
@ -1973,7 +1976,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// Need storage servers now for processStatusFetcher() below.
ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>> _storageServers = wait(storageServerFuture);
ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>> _storageServers = wait(storageServerFuture);
if (_storageServers.present()) {
storageServers = _storageServers.get();
}
@ -1982,7 +1985,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// ...also tlogs
ErrorOr<vector<std::pair<TLogInterface, TraceEventFields>>> _tLogs = wait(tLogFuture);
ErrorOr<vector<std::pair<TLogInterface, EventMap>>> _tLogs = wait(tLogFuture);
if (_tLogs.present()) {
tLogs = _tLogs.get();
}
@ -1991,7 +1994,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// ...also proxies
ErrorOr<vector<std::pair<MasterProxyInterface, TraceEventFields>>> _proxies = wait(proxyFuture);
ErrorOr<vector<std::pair<MasterProxyInterface, EventMap>>> _proxies = wait(proxyFuture);
if (_proxies.present()) {
proxies = _proxies.get();
}

View File

@ -459,7 +459,7 @@ public:
fetchWaitingCount("FetchWaitingCount", cc),
fetchExecutingMS("FetchExecutingMS", cc),
fetchExecutingCount("FetchExecutingCount", cc),
readLatencyBands("ReadLatency", cc)
readLatencyBands("ReadLatencyMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
{
specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; });
specialCounter(cc, "Version", [self](){ return self->version.get(); });

View File

@ -31,6 +31,9 @@ extern bool noUnseed;
struct StatusWorkload : TestWorkload {
double testDuration, requestsPerSecond;
bool enableLatencyBands;
Future<Void> latencyBandActor;
PerfIntCounter requests, replies, errors, totalSize;
Optional<StatusObject> parsedSchema;
@ -41,6 +44,7 @@ struct StatusWorkload : TestWorkload {
{
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
requestsPerSecond = getOption(options, LiteralStringRef("requestsPerSecond"), 0.5);
enableLatencyBands = getOption(options, LiteralStringRef("enableLatencyBands"), g_random->random01() < 0.5);
auto statusSchemaStr = getOption(options, LiteralStringRef("schema"), JSONSchemas::statusSchema);
if (statusSchemaStr.size()) {
json_spirit::mValue schema = readJSONStrictly(statusSchemaStr.toString());
@ -55,6 +59,10 @@ struct StatusWorkload : TestWorkload {
virtual std::string description() { return "StatusWorkload"; }
virtual Future<Void> setup(Database const& cx) {
if(enableLatencyBands) {
latencyBandActor = configureLatencyBands(this, cx);
}
return Void();
}
virtual Future<Void> start(Database const& cx) {
@ -103,6 +111,56 @@ struct StatusWorkload : TestWorkload {
}
}
static std::string generateBands() {
int numBands = g_random->randomInt(0, 10);
std::vector<double> bands;
while(bands.size() < numBands) {
bands.push_back(g_random->random01() * pow(10, g_random->randomInt(-5, 1)));
}
std::string result = "\"bands\":[";
for(int i = 0; i < bands.size(); ++i) {
if(i > 0) {
result += ",";
}
result += format("%f", bands[i]);
}
return result + "]";
}
ACTOR Future<Void> configureLatencyBands(StatusWorkload *self, Database cx) {
loop {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
std::string config = "{"
"\"get_read_version\":{" + generateBands() + "},"
"\"read\":{" + generateBands() + format(", \"max_key_selector_offset\":%d, \"max_read_bytes\":%d},", g_random->randomInt(0, 10000), g_random->randomInt(0, 1000000)) + ""
"\"commit\":{" + generateBands() + format(", \"max_commit_bytes\":%d", g_random->randomInt(0, 1000000)) + "}"
"}";
tr.set(latencyBandConfigKey, ValueRef(config));
wait(tr.commit());
if(g_random->random01() < 0.3) {
return;
}
wait(delay(g_random->random01() * 120));
}
catch(Error &e) {
wait(tr.onError(e));
}
}
}
}
ACTOR Future<Void> fetcher(Reference<ClusterConnectionFile> connFile, StatusWorkload *self) {
state double lastTime = now();
@ -131,7 +189,6 @@ struct StatusWorkload : TestWorkload {
}
}
}
};
WorkloadFactory<StatusWorkload> StatusWorkloadFactory("Status");

View File

@ -44,9 +44,6 @@ struct TimedRequest {
TimedRequest() {
requestTime = timer();
}
template <class Ar>
void serialize(Ar& ar) {}
};
struct ICounter {
@ -121,14 +118,19 @@ struct SpecialCounter : ICounter, FastAllocated<SpecialCounter<F>>, NonCopyable
template <class F>
static void specialCounter(CounterCollection& collection, std::string const& name, F && f) { new SpecialCounter<F>(collection, name, std::move(f)); }
Future<Void> traceCounters(std::string const& traceEventName, UID const& traceEventID, double const& interval, CounterCollection* const& counters, std::string const& trackLatestName = std::string());
class LatencyBands {
public:
LatencyBands(std::string name, CounterCollection &cc) : name(name), cc(cc), filteredCount(nullptr) {}
LatencyBands(std::string name, UID id, double loggingInterval) : name(name), id(id), loggingInterval(loggingInterval), cc(nullptr), filteredCount(nullptr) {}
void addThreshold(double value) {
if(value > 0 && bands.count(value) == 0) {
if(bands.size() == 0) {
filteredCount = new Counter(format("Filtered%s", name.c_str()), cc);
ASSERT(!cc && !filteredCount);
cc = new CounterCollection(name, id.toString());
logger = traceCounters(name, id, loggingInterval, cc, id.toString() + "/" + name);
filteredCount = new Counter("Filtered", *cc);
insertBand(std::numeric_limits<double>::infinity());
}
@ -148,6 +150,8 @@ public:
}
void clearBands() {
logger = Void();
for(auto itr : bands) {
delete itr.second;
}
@ -155,6 +159,10 @@ public:
bands.clear();
delete filteredCount;
delete cc;
filteredCount = nullptr;
cc = nullptr;
}
~LatencyBands() {
@ -166,14 +174,14 @@ private:
Counter *filteredCount;
std::string name;
CounterCollection &cc;
UID id;
double loggingInterval;
CounterCollection *cc;
Future<Void> logger;
void insertBand(double value) {
bands.insert(std::make_pair(value, new Counter(format("%s%f", name.c_str(), value), cc)));
bands.insert(std::make_pair(value, new Counter(format("Band%f", value), *cc)));
}
};
Future<Void> traceCounters(std::string const& traceEventName, UID const& traceEventID, double const& interval, CounterCollection* const& counters, std::string const& trackLatestName = std::string());
#endif