diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index 49e3a2bc27..b360abcb0e 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -96,7 +96,7 @@ struct CommitTransactionRequest : TimedRequest { template void serialize(Ar& ar) { - serializer(ar, *(TimedRequest*)this, transaction, reply, arena, flags, debugID); + serializer(ar, transaction, reply, arena, flags, debugID); } }; diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index e901665d8c..0ea04cbf92 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -126,25 +126,13 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "roughness":0.0 }, "grv_latency_bands":{ - "0.01": { - "hz":0.0, - "counter":0, - "roughness":0.0 - } + "$map": 1 }, "read_latency_bands":{ - "0.01": { - "hz":0.0, - "counter":0, - "roughness":0.0 - } + "$map": 1 }, "commit_latency_bands":{ - "0.01": { - "hz":0.0, - "counter":0, - "roughness":0.0 - } + "$map": 1 } } ], diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 1baee2f677..3e969915cf 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -65,8 +65,8 @@ struct ProxyStats { : cc("ProxyStats", id.toString()), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc), txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc), txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc), txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc), txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc), txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc), txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc), - txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), lastCommitVersionAssigned(0), commitLatencyBands("CommitLatency", cc), - grvLatencyBands("GRVLatency", cc) + txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnConflicts("TxnConflicts", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc), lastCommitVersionAssigned(0), + commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY), grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;}); specialCounter(cc, "Version", [pVersion](){return *pVersion; }); @@ -1069,7 +1069,7 @@ ACTOR Future fetchVersions(ProxyCommitData *commitData) { ACTOR Future sendGrvReplies(Future replyFuture, std::vector requests, ProxyStats *stats) { GetReadVersionReply reply = wait(replyFuture); double end = timer(); - for(GetReadVersionRequest request : requests) { + for(GetReadVersionRequest const& request : requests) { stats->grvLatencyBands.addMeasurement(end - request.requestTime); request.reply.send(reply); } diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 6cb02ee115..b461cc8677 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -80,7 +80,8 @@ extern int limitReasonEnd; extern const char* limitReasonName[]; extern const char* limitReasonDesc[]; -struct WorkerEvents : std::map {}; +struct WorkerEvents : std::map {}; +typedef std::map EventMap; ACTOR static Future< Optional > latestEventOnWorker(WorkerInterface worker, std::string eventName) { try { @@ -303,7 +304,7 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector workerContribMap; std::map machineJsonMap; - for (auto worker : workers){ + for (auto const& worker : workers){ locality[worker.first.address()] = worker.first.locality; if (worker.first.locality.dcId().present()) dcIds[worker.first.address()] = worker.first.locality.dcId().get().printable(); @@ -407,62 +408,65 @@ struct MachineMemoryInfo { struct RolesInfo { std::multimap roles; + + JsonBuilderObject addLatencyBandInfo(TraceEventFields const& metrics) { + JsonBuilderObject latency; + std::map bands; + + for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) { + std::string band; + if(itr->first.substr(0, 4) == "Band") { + band = itr->first.substr(4); + } + else if(itr->first == "Filtered") { + band = "filtered"; + } + else { + continue; + } + + latency[band] = StatusCounter(itr->second).getCounter(); + } + + return latency; + } + JsonBuilderObject& addRole( NetworkAddress address, std::string const& role, UID id) { JsonBuilderObject obj; obj["id"] = id.shortString(); obj["role"] = role; return roles.insert( std::make_pair(address, obj ))->second; } - JsonBuilderObject& addRole(std::string const& role, StorageServerInterface& iface, TraceEventFields const& metrics, Version maxTLogVersion, double* pDataLagSeconds) { + JsonBuilderObject& addRole(std::string const& role, StorageServerInterface& iface, EventMap const& metrics, Version maxTLogVersion, double* pDataLagSeconds) { JsonBuilderObject obj; double dataLagSeconds = -1.0; obj["id"] = iface.id().shortString(); obj["role"] = role; try { - obj.setKeyRawNumber("stored_bytes", metrics.getValue("BytesStored")); - obj.setKeyRawNumber("kvstore_used_bytes", metrics.getValue("KvstoreBytesUsed")); - obj.setKeyRawNumber("kvstore_free_bytes", metrics.getValue("KvstoreBytesFree")); - obj.setKeyRawNumber("kvstore_available_bytes", metrics.getValue("KvstoreBytesAvailable")); - obj.setKeyRawNumber("kvstore_total_bytes", metrics.getValue("KvstoreBytesTotal")); - obj["input_bytes"] = StatusCounter(metrics.getValue("BytesInput")).getStatus(); - obj["durable_bytes"] = StatusCounter(metrics.getValue("BytesDurable")).getStatus(); - obj.setKeyRawNumber("query_queue_max", metrics.getValue("QueryQueueMax")); - obj["total_queries"] = StatusCounter(metrics.getValue("QueryQueue")).getStatus(); - obj["finished_queries"] = StatusCounter(metrics.getValue("FinishedQueries")).getStatus(); - obj["bytes_queried"] = StatusCounter(metrics.getValue("BytesQueried")).getStatus(); - obj["keys_queried"] = StatusCounter(metrics.getValue("RowsQueried")).getStatus(); - obj["mutation_bytes"] = StatusCounter(metrics.getValue("MutationBytes")).getStatus(); - obj["mutations"] = StatusCounter(metrics.getValue("Mutations")).getStatus(); + TraceEventFields const& storageMetrics = metrics.at("StorageMetrics"); - std::string latencyBandPrefix = "ReadLatency"; + obj.setKeyRawNumber("stored_bytes", storageMetrics.getValue("BytesStored")); + obj.setKeyRawNumber("kvstore_used_bytes", storageMetrics.getValue("KvstoreBytesUsed")); + obj.setKeyRawNumber("kvstore_free_bytes", storageMetrics.getValue("KvstoreBytesFree")); + obj.setKeyRawNumber("kvstore_available_bytes", storageMetrics.getValue("KvstoreBytesAvailable")); + obj.setKeyRawNumber("kvstore_total_bytes", storageMetrics.getValue("KvstoreBytesTotal")); + obj["input_bytes"] = StatusCounter(storageMetrics.getValue("BytesInput")).getStatus(); + obj["durable_bytes"] = StatusCounter(storageMetrics.getValue("BytesDurable")).getStatus(); + obj.setKeyRawNumber("query_queue_max", storageMetrics.getValue("QueryQueueMax")); + obj["total_queries"] = StatusCounter(storageMetrics.getValue("QueryQueue")).getStatus(); + obj["finished_queries"] = StatusCounter(storageMetrics.getValue("FinishedQueries")).getStatus(); + obj["bytes_queried"] = StatusCounter(storageMetrics.getValue("BytesQueried")).getStatus(); + obj["keys_queried"] = StatusCounter(storageMetrics.getValue("RowsQueried")).getStatus(); + obj["mutation_bytes"] = StatusCounter(storageMetrics.getValue("MutationBytes")).getStatus(); + obj["mutations"] = StatusCounter(storageMetrics.getValue("Mutations")).getStatus(); - JsonBuilderObject latency; - std::map bands; - - bool found = false; - for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) { - if(itr->first.substr(0, latencyBandPrefix.size()) == latencyBandPrefix) { - found = true; - std::string band = itr->first.substr(latencyBandPrefix.size()); - latency[band] = StatusCounter(itr->second).getCounter(); - } - - std::string value; - if(metrics.tryGetValue("Filtered" + latencyBandPrefix, value)) { - latency["filtered"] = StatusCounter(value).getCounter(); - } - } - if(found) { - obj["read_latency_bands"] = latency; - } - - Version version = parseInt64(metrics.getValue("Version")); - Version durableVersion = parseInt64(metrics.getValue("DurableVersion")); + Version version = parseInt64(storageMetrics.getValue("Version")); + Version durableVersion = parseInt64(storageMetrics.getValue("DurableVersion")); obj["data_version"] = version; obj["durable_version"] = durableVersion; - int64_t versionLag = parseInt64(metrics.getValue("VersionLag")); + int64_t versionLag = parseInt64(storageMetrics.getValue("VersionLag")); if(maxTLogVersion > 0) { // It's possible that the storage server hasn't talked to the logs recently, in which case it may not be aware of how far behind it is. // To account for that, we also compute the version difference between each storage server and the tlog with the largest version. @@ -472,6 +476,11 @@ struct RolesInfo { versionLag = std::max(versionLag, maxTLogVersion - version - SERVER_KNOBS->STORAGE_LOGGING_DELAY * SERVER_KNOBS->VERSIONS_PER_SECOND); } + TraceEventFields const& readLatencyMetrics = metrics.at("ReadLatencyMetrics"); + if(readLatencyMetrics.size()) { + obj["read_latency_bands"] = addLatencyBandInfo(readLatencyMetrics); + } + JsonBuilderObject dataLag; dataLag["versions"] = versionLag; dataLagSeconds = versionLag / (double)SERVER_KNOBS->VERSIONS_PER_SECOND; @@ -495,23 +504,25 @@ struct RolesInfo { return roles.insert( std::make_pair(iface.address(), obj ))->second; } - JsonBuilderObject& addRole(std::string const& role, TLogInterface& iface, TraceEventFields const& metrics, Version* pMetricVersion) { + JsonBuilderObject& addRole(std::string const& role, TLogInterface& iface, EventMap const& metrics, Version* pMetricVersion) { JsonBuilderObject obj; Version metricVersion = 0; obj["id"] = iface.id().shortString(); obj["role"] = role; try { - obj.setKeyRawNumber("kvstore_used_bytes",metrics.getValue("KvstoreBytesUsed")); - obj.setKeyRawNumber("kvstore_free_bytes",metrics.getValue("KvstoreBytesFree")); - obj.setKeyRawNumber("kvstore_available_bytes",metrics.getValue("KvstoreBytesAvailable")); - obj.setKeyRawNumber("kvstore_total_bytes",metrics.getValue("KvstoreBytesTotal")); - obj.setKeyRawNumber("queue_disk_used_bytes",metrics.getValue("QueueDiskBytesUsed")); - obj.setKeyRawNumber("queue_disk_free_bytes",metrics.getValue("QueueDiskBytesFree")); - obj.setKeyRawNumber("queue_disk_available_bytes",metrics.getValue("QueueDiskBytesAvailable")); - obj.setKeyRawNumber("queue_disk_total_bytes",metrics.getValue("QueueDiskBytesTotal")); - obj["input_bytes"] = StatusCounter(metrics.getValue("BytesInput")).getStatus(); - obj["durable_bytes"] = StatusCounter(metrics.getValue("BytesDurable")).getStatus(); - metricVersion = parseInt64(metrics.getValue("Version")); + TraceEventFields const& tlogMetrics = metrics.at("TLogMetrics"); + + obj.setKeyRawNumber("kvstore_used_bytes", tlogMetrics.getValue("KvstoreBytesUsed")); + obj.setKeyRawNumber("kvstore_free_bytes", tlogMetrics.getValue("KvstoreBytesFree")); + obj.setKeyRawNumber("kvstore_available_bytes", tlogMetrics.getValue("KvstoreBytesAvailable")); + obj.setKeyRawNumber("kvstore_total_bytes", tlogMetrics.getValue("KvstoreBytesTotal")); + obj.setKeyRawNumber("queue_disk_used_bytes", tlogMetrics.getValue("QueueDiskBytesUsed")); + obj.setKeyRawNumber("queue_disk_free_bytes", tlogMetrics.getValue("QueueDiskBytesFree")); + obj.setKeyRawNumber("queue_disk_available_bytes", tlogMetrics.getValue("QueueDiskBytesAvailable")); + obj.setKeyRawNumber("queue_disk_total_bytes", tlogMetrics.getValue("QueueDiskBytesTotal")); + obj["input_bytes"] = StatusCounter(tlogMetrics.getValue("BytesInput")).getStatus(); + obj["durable_bytes"] = StatusCounter(tlogMetrics.getValue("BytesDurable")).getStatus(); + metricVersion = parseInt64(tlogMetrics.getValue("Version")); obj["data_version"] = metricVersion; } catch (Error& e) { if(e.code() != error_code_attribute_not_found) @@ -521,47 +532,19 @@ struct RolesInfo { *pMetricVersion = metricVersion; return roles.insert( std::make_pair(iface.address(), obj ))->second; } - JsonBuilderObject& addRole(std::string const& role, MasterProxyInterface& iface, TraceEventFields const& metrics) { + JsonBuilderObject& addRole(std::string const& role, MasterProxyInterface& iface, EventMap const& metrics) { JsonBuilderObject obj; obj["id"] = iface.id().shortString(); obj["role"] = role; try { - std::string grvPrefix = "GRVLatency"; - std::string commitPrefix = "CommitLatency"; - - JsonBuilderObject grvLatency; - JsonBuilderObject commitLatency; - - bool grvFound = false; - bool commitFound = false; - for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) { - if(itr->first.substr(0, grvPrefix.size()) == grvPrefix) { - grvFound = true; - std::string band = itr->first.substr(grvPrefix.size()); - grvLatency[band] = StatusCounter(itr->second).getCounter(); - } - else if(itr->first.substr(0, commitPrefix.size()) == commitPrefix) { - commitFound = true; - std::string band = itr->first.substr(commitPrefix.size()); - commitLatency[band] = StatusCounter(itr->second).getCounter(); - } + TraceEventFields const& grvLatencyMetrics = metrics.at("GRVLatencyMetrics"); + if(grvLatencyMetrics.size()) { + obj["grv_latency_bands"] = addLatencyBandInfo(grvLatencyMetrics); } - if(grvFound) { - std::string value; - if(metrics.tryGetValue("Filtered" + grvPrefix, value)) { - grvLatency["filtered"] = StatusCounter(value).getCounter(); - } - - obj["grv_latency_bands"] = grvLatency; - } - if(commitFound) { - std::string value; - if(metrics.tryGetValue("Filtered" + commitPrefix, value)) { - commitLatency["filtered"] = StatusCounter(value).getCounter(); - } - - obj["commit_latency_bands"] = commitLatency; + TraceEventFields const& commitLatencyMetrics = metrics.at("CommitLatencyMetrics"); + if(commitLatencyMetrics.size()) { + obj["commit_latency_bands"] = addLatencyBandInfo(commitLatencyMetrics); } } catch (Error &e) { if(e.code() != error_code_attribute_not_found) { @@ -595,9 +578,9 @@ ACTOR static Future processStatusFetcher( WorkerEvents traceFileOpenErrors, WorkerEvents programStarts, std::map processIssues, - vector> storageServers, - vector> tLogs, - vector> proxies, + vector> storageServers, + vector> tLogs, + vector> proxies, Database cx, Optional configuration, std::set *incomplete_reasons) { @@ -656,13 +639,13 @@ ACTOR static Future processStatusFetcher( roles.addRole("master", db->get().master); roles.addRole("cluster_controller", db->get().clusterInterface.clientInterface); - state std::vector>::iterator proxy; + state std::vector>::iterator proxy; for(proxy = proxies.begin(); proxy != proxies.end(); ++proxy) { roles.addRole( "proxy", proxy->first, proxy->second ); wait(yield()); } - state std::vector>::iterator log; + state std::vector>::iterator log; state Version maxTLogVersion = 0; // Get largest TLog version @@ -673,7 +656,7 @@ ACTOR static Future processStatusFetcher( wait(yield()); } - state std::vector>::iterator ss; + state std::vector>::iterator ss; state std::map ssLag; state double lagSeconds; for(ss = storageServers.begin(); ss != storageServers.end(); ++ss) { @@ -1303,34 +1286,50 @@ namespace std } ACTOR template -static Future>> getServerMetrics(vector servers, std::unordered_map address_workers, std::string eventName, bool useId) { +static Future>> getServerMetrics(vector servers, std::unordered_map address_workers, std::vector eventNames) { state vector>> futures; for (auto s : servers) { - futures.push_back(latestEventOnWorker(address_workers[s.address()], (useId ? s.id().toString() + "/" + eventName : eventName))); + for (auto name : eventNames) { + futures.push_back(latestEventOnWorker(address_workers[s.address()], s.id().toString() + "/" + name)); + } } wait(waitForAll(futures)); - vector> results; + vector> results; + auto futureItr = futures.begin(); + for (int i = 0; i < servers.size(); i++) { - results.push_back(std::make_pair(servers[i], futures[i].get().present() ? futures[i].get().get() : TraceEventFields())); + EventMap serverResults; + for (auto name : eventNames) { + ASSERT(futureItr != futures.end()); + serverResults[name] = futureItr->get().present() ? futureItr->get().get() : TraceEventFields(); + ++futureItr; + } + + results.push_back(std::make_pair(servers[i], serverResults)); } + return results; } -ACTOR static Future>> getStorageServersAndMetrics(Database cx, std::unordered_map address_workers) { +ACTOR static Future>> getStorageServersAndMetrics(Database cx, std::unordered_map address_workers) { vector servers = wait(timeoutError(getStorageServers(cx, true), 5.0)); - vector> results = wait(getServerMetrics(servers, address_workers, "StorageMetrics", true)); + vector> results = wait(getServerMetrics(servers, address_workers, + std::vector{ "StorageMetrics", "ReadLatencyMetrics" })); + return results; } -ACTOR static Future>> getTLogsAndMetrics(Reference> db, std::unordered_map address_workers) { +ACTOR static Future>> getTLogsAndMetrics(Reference> db, std::unordered_map address_workers) { vector servers = db->get().logSystemConfig.allPresentLogs(); - vector> results = wait(getServerMetrics(servers, address_workers, "TLogMetrics", true)); + vector> results = wait(getServerMetrics(servers, address_workers, + std::vector{ "TLogMetrics" })); + return results; } -ACTOR static Future>> getProxiesAndMetrics(Database cx, std::unordered_map address_workers) { +ACTOR static Future>> getProxiesAndMetrics(Database cx, std::unordered_map address_workers) { Reference proxyInfo = cx->getMasterProxies(); std::vector servers; if(proxyInfo) { @@ -1339,14 +1338,16 @@ ACTOR static Future>> g } } - vector> results = wait(getServerMetrics(servers, address_workers, "ProxyMetrics", false)); + vector> results = wait(getServerMetrics(servers, address_workers, + std::vector{ "GRVLatencyMetrics", "CommitLatencyMetrics" })); + return results; } static int getExtraTLogEligibleMachines(vector> workers, DatabaseConfiguration configuration) { std::set allMachines; std::map> dcId_machine; - for(auto worker : workers) { + for(auto const& worker : workers) { if(worker.second.machineClassFitness(ProcessClass::TLog) < ProcessClass::NeverAssign && !configuration.isExcludedServer(worker.first.address())) { @@ -1383,7 +1384,7 @@ static int getExtraTLogEligibleMachines(vector workloadStatusFetcher(Reference> db, vector> workers, std::pair mWorker, - JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set *incomplete_reasons, Future>>> storageServerFuture) + JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set *incomplete_reasons, Future>>> storageServerFuture) { state JsonBuilderObject statusObj; state JsonBuilderObject operationsObj; @@ -1394,7 +1395,7 @@ ACTOR static Future workloadStatusFetcher(Reference> proxyStatFutures; std::map> workersMap; - for (auto w : workers) { + for (auto const& w : workers) { workersMap[w.first.address()] = w; } for (auto &p : db->get().client.proxies) { @@ -1486,7 +1487,7 @@ ACTOR static Future workloadStatusFetcher(Reference>> storageServers = wait(storageServerFuture); + ErrorOr>> storageServers = wait(storageServerFuture); if(!storageServers.present()) { throw storageServers.getError(); } @@ -1497,10 +1498,12 @@ ACTOR static Future workloadStatusFetcher(Reference clusterGetStatus( } state std::map processIssues = getProcessIssuesAsMessages(workerIssues); - state vector> storageServers; - state vector> tLogs; - state vector> proxies; + state vector> storageServers; + state vector> tLogs; + state vector> proxies; state JsonBuilderObject qos; state JsonBuilderObject data_overlay; @@ -1914,13 +1917,13 @@ ACTOR Future clusterGetStatus( // Start getting storage servers now (using system priority) concurrently. Using sys priority because having storage servers // in status output is important to give context to error messages in status that reference a storage server role ID. state std::unordered_map address_workers; - for (auto worker : workers) { + for (auto const& worker : workers) { address_workers[worker.first.address()] = worker.first; } - state Future>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers)); - state Future>>> tLogFuture = errorOr(getTLogsAndMetrics(db, address_workers)); - state Future>>> proxyFuture = errorOr(getProxiesAndMetrics(cx, address_workers)); + state Future>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers)); + state Future>>> tLogFuture = errorOr(getTLogsAndMetrics(db, address_workers)); + state Future>>> proxyFuture = errorOr(getProxiesAndMetrics(cx, address_workers)); state int minReplicasRemaining = -1; std::vector> futures2; @@ -1973,7 +1976,7 @@ ACTOR Future clusterGetStatus( } // Need storage servers now for processStatusFetcher() below. - ErrorOr>> _storageServers = wait(storageServerFuture); + ErrorOr>> _storageServers = wait(storageServerFuture); if (_storageServers.present()) { storageServers = _storageServers.get(); } @@ -1982,7 +1985,7 @@ ACTOR Future clusterGetStatus( } // ...also tlogs - ErrorOr>> _tLogs = wait(tLogFuture); + ErrorOr>> _tLogs = wait(tLogFuture); if (_tLogs.present()) { tLogs = _tLogs.get(); } @@ -1991,7 +1994,7 @@ ACTOR Future clusterGetStatus( } // ...also proxies - ErrorOr>> _proxies = wait(proxyFuture); + ErrorOr>> _proxies = wait(proxyFuture); if (_proxies.present()) { proxies = _proxies.get(); } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 35bb70e32d..b4a4d88ea4 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -459,7 +459,7 @@ public: fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc), - readLatencyBands("ReadLatency", cc) + readLatencyBands("ReadLatencyMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; }); specialCounter(cc, "Version", [self](){ return self->version.get(); }); diff --git a/fdbserver/workloads/StatusWorkload.actor.cpp b/fdbserver/workloads/StatusWorkload.actor.cpp index d3b4aaafdc..5cdd52c468 100644 --- a/fdbserver/workloads/StatusWorkload.actor.cpp +++ b/fdbserver/workloads/StatusWorkload.actor.cpp @@ -31,6 +31,9 @@ extern bool noUnseed; struct StatusWorkload : TestWorkload { double testDuration, requestsPerSecond; + bool enableLatencyBands; + + Future latencyBandActor; PerfIntCounter requests, replies, errors, totalSize; Optional parsedSchema; @@ -41,6 +44,7 @@ struct StatusWorkload : TestWorkload { { testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0); requestsPerSecond = getOption(options, LiteralStringRef("requestsPerSecond"), 0.5); + enableLatencyBands = getOption(options, LiteralStringRef("enableLatencyBands"), g_random->random01() < 0.5); auto statusSchemaStr = getOption(options, LiteralStringRef("schema"), JSONSchemas::statusSchema); if (statusSchemaStr.size()) { json_spirit::mValue schema = readJSONStrictly(statusSchemaStr.toString()); @@ -55,6 +59,10 @@ struct StatusWorkload : TestWorkload { virtual std::string description() { return "StatusWorkload"; } virtual Future setup(Database const& cx) { + if(enableLatencyBands) { + latencyBandActor = configureLatencyBands(this, cx); + } + return Void(); } virtual Future start(Database const& cx) { @@ -103,6 +111,56 @@ struct StatusWorkload : TestWorkload { } } + static std::string generateBands() { + int numBands = g_random->randomInt(0, 10); + std::vector bands; + + while(bands.size() < numBands) { + bands.push_back(g_random->random01() * pow(10, g_random->randomInt(-5, 1))); + } + + std::string result = "\"bands\":["; + for(int i = 0; i < bands.size(); ++i) { + if(i > 0) { + result += ","; + } + + result += format("%f", bands[i]); + } + + return result + "]"; + } + + ACTOR Future configureLatencyBands(StatusWorkload *self, Database cx) { + loop { + state Transaction tr(cx); + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + + std::string config = "{" + "\"get_read_version\":{" + generateBands() + "}," + "\"read\":{" + generateBands() + format(", \"max_key_selector_offset\":%d, \"max_read_bytes\":%d},", g_random->randomInt(0, 10000), g_random->randomInt(0, 1000000)) + "" + "\"commit\":{" + generateBands() + format(", \"max_commit_bytes\":%d", g_random->randomInt(0, 1000000)) + "}" + "}"; + + tr.set(latencyBandConfigKey, ValueRef(config)); + wait(tr.commit()); + + if(g_random->random01() < 0.3) { + return; + } + + wait(delay(g_random->random01() * 120)); + } + catch(Error &e) { + wait(tr.onError(e)); + } + } + } + } + ACTOR Future fetcher(Reference connFile, StatusWorkload *self) { state double lastTime = now(); @@ -131,7 +189,6 @@ struct StatusWorkload : TestWorkload { } } } - }; WorkloadFactory StatusWorkloadFactory("Status"); diff --git a/flow/Stats.h b/flow/Stats.h index ee233ac188..8044c4d802 100644 --- a/flow/Stats.h +++ b/flow/Stats.h @@ -44,9 +44,6 @@ struct TimedRequest { TimedRequest() { requestTime = timer(); } - - template - void serialize(Ar& ar) {} }; struct ICounter { @@ -121,14 +118,19 @@ struct SpecialCounter : ICounter, FastAllocated>, NonCopyable template static void specialCounter(CounterCollection& collection, std::string const& name, F && f) { new SpecialCounter(collection, name, std::move(f)); } +Future traceCounters(std::string const& traceEventName, UID const& traceEventID, double const& interval, CounterCollection* const& counters, std::string const& trackLatestName = std::string()); + class LatencyBands { public: - LatencyBands(std::string name, CounterCollection &cc) : name(name), cc(cc), filteredCount(nullptr) {} + LatencyBands(std::string name, UID id, double loggingInterval) : name(name), id(id), loggingInterval(loggingInterval), cc(nullptr), filteredCount(nullptr) {} void addThreshold(double value) { if(value > 0 && bands.count(value) == 0) { if(bands.size() == 0) { - filteredCount = new Counter(format("Filtered%s", name.c_str()), cc); + ASSERT(!cc && !filteredCount); + cc = new CounterCollection(name, id.toString()); + logger = traceCounters(name, id, loggingInterval, cc, id.toString() + "/" + name); + filteredCount = new Counter("Filtered", *cc); insertBand(std::numeric_limits::infinity()); } @@ -148,6 +150,8 @@ public: } void clearBands() { + logger = Void(); + for(auto itr : bands) { delete itr.second; } @@ -155,6 +159,10 @@ public: bands.clear(); delete filteredCount; + delete cc; + + filteredCount = nullptr; + cc = nullptr; } ~LatencyBands() { @@ -166,14 +174,14 @@ private: Counter *filteredCount; std::string name; - CounterCollection &cc; + UID id; + double loggingInterval; + + CounterCollection *cc; + Future logger; void insertBand(double value) { - bands.insert(std::make_pair(value, new Counter(format("%s%f", name.c_str(), value), cc))); + bands.insert(std::make_pair(value, new Counter(format("Band%f", value), *cc))); } }; - - -Future traceCounters(std::string const& traceEventName, UID const& traceEventID, double const& interval, CounterCollection* const& counters, std::string const& trackLatestName = std::string()); - #endif