Added the ability to configure the latency band settings by setting a special key in \xff keyspace.
This commit is contained in:
parent
7498c2308c
commit
8e05e95045
|
@ -1650,7 +1650,7 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
|
|||
StatusObject configJSON = config.get_obj();
|
||||
|
||||
json_spirit::mValue schema;
|
||||
if(!json_spirit::read_string( JSONSchemas::configurationSchema.toString(), schema )) {
|
||||
if(!json_spirit::read_string( JSONSchemas::clusterConfigurationSchema.toString(), schema )) {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
|
|
|
@ -1553,120 +1553,121 @@ void schemaCoverage( std::string const& spath, bool covered ) {
|
|||
}
|
||||
}
|
||||
|
||||
bool schemaMatch( StatusObject const schema, StatusObject const result, std::string& errorStr, Severity sev, bool checkCoverage, std::string path, std::string schema_path ) {
|
||||
bool schemaMatch( json_spirit::mValue const& schemaValue, json_spirit::mValue const& resultValue, std::string& errorStr, Severity sev, bool checkCoverage, std::string path, std::string schemaPath ) {
|
||||
// Returns true if everything in `result` is permitted by `schema`
|
||||
|
||||
// Really this should recurse on "values" rather than "objects"?
|
||||
|
||||
bool ok = true;
|
||||
|
||||
try {
|
||||
for(auto& rkv : result) {
|
||||
auto& key = rkv.first;
|
||||
auto& rv = rkv.second;
|
||||
std::string kpath = path + "." + key;
|
||||
std::string spath = schema_path + "." + key;
|
||||
if(normJSONType(schemaValue.type()) != normJSONType(resultValue.type())) {
|
||||
errorStr += format("ERROR: Incorrect value type for key `%s'\n", path.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", path).detail("SchemaType", schemaValue.type()).detail("ValueType", resultValue.type());
|
||||
return false;
|
||||
}
|
||||
|
||||
if(checkCoverage) schemaCoverage(spath);
|
||||
if(resultValue.type() == json_spirit::obj_type) {
|
||||
auto& result = resultValue.get_obj();
|
||||
auto& schema = schemaValue.get_obj();
|
||||
|
||||
if (!schema.count(key)) {
|
||||
errorStr += format("ERROR: Unknown key `%s'\n", kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaPath", spath);
|
||||
ok = false;
|
||||
continue;
|
||||
}
|
||||
auto& sv = schema.at(key);
|
||||
for(auto& rkv : result) {
|
||||
auto& key = rkv.first;
|
||||
auto& rv = rkv.second;
|
||||
std::string kpath = path + "." + key;
|
||||
std::string spath = schemaPath + "." + key;
|
||||
|
||||
if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$enum")) {
|
||||
auto& enum_values = sv.get_obj().at("$enum").get_array();
|
||||
if(checkCoverage) {
|
||||
schemaCoverage(spath);
|
||||
}
|
||||
|
||||
bool any_match = false;
|
||||
for(auto& enum_item : enum_values)
|
||||
if (enum_item == rv) {
|
||||
any_match = true;
|
||||
if(checkCoverage) schemaCoverage(spath + ".$enum." + enum_item.get_str());
|
||||
break;
|
||||
if(!schema.count(key)) {
|
||||
errorStr += format("ERROR: Unknown key `%s'\n", kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaPath", spath);
|
||||
ok = false;
|
||||
continue;
|
||||
}
|
||||
auto& sv = schema.at(key);
|
||||
|
||||
if(sv.type() == json_spirit::obj_type && sv.get_obj().count("$enum")) {
|
||||
auto& enum_values = sv.get_obj().at("$enum").get_array();
|
||||
|
||||
bool any_match = false;
|
||||
for(auto& enum_item : enum_values)
|
||||
if(enum_item == rv) {
|
||||
any_match = true;
|
||||
if(checkCoverage) {
|
||||
schemaCoverage(spath + ".$enum." + enum_item.get_str());
|
||||
}
|
||||
break;
|
||||
}
|
||||
if(!any_match) {
|
||||
errorStr += format("ERROR: Unknown value `%s' for key `%s'\n", json_spirit::write_string(rv).c_str(), kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaEnumItems", enum_values.size()).detail("Value", json_spirit::write_string(rv));
|
||||
if(checkCoverage) {
|
||||
schemaCoverage(spath + ".$enum." + json_spirit::write_string(rv));
|
||||
}
|
||||
ok = false;
|
||||
}
|
||||
if (!any_match) {
|
||||
errorStr += format("ERROR: Unknown value `%s' for key `%s'\n", json_spirit::write_string(rv).c_str(), kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaEnumItems", enum_values.size()).detail("Value", json_spirit::write_string(rv));
|
||||
if(checkCoverage) schemaCoverage(spath + ".$enum." + json_spirit::write_string(rv));
|
||||
ok = false;
|
||||
}
|
||||
} else if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$map")) {
|
||||
if (rv.type() != json_spirit::obj_type) {
|
||||
errorStr += format("ERROR: Expected an object as the value for key `%s'\n", kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
|
||||
ok = false;
|
||||
continue;
|
||||
}
|
||||
if(sv.get_obj().at("$map").type() != json_spirit::obj_type) {
|
||||
continue;
|
||||
}
|
||||
auto& schema_obj = sv.get_obj().at("$map").get_obj();
|
||||
auto& value_obj = rv.get_obj();
|
||||
|
||||
if(checkCoverage) schemaCoverage(spath + ".$map");
|
||||
|
||||
for(auto& value_pair : value_obj) {
|
||||
auto vpath = kpath + "[" + value_pair.first + "]";
|
||||
auto upath = spath + ".$map";
|
||||
if (value_pair.second.type() != json_spirit::obj_type) {
|
||||
errorStr += format("ERROR: Expected an object for `%s'\n", vpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", vpath).detail("ValueType", value_pair.second.type());
|
||||
} else if(sv.type() == json_spirit::obj_type && sv.get_obj().count("$map")) {
|
||||
if(rv.type() != json_spirit::obj_type) {
|
||||
errorStr += format("ERROR: Expected an object as the value for key `%s'\n", kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
|
||||
ok = false;
|
||||
continue;
|
||||
}
|
||||
if (!schemaMatch(schema_obj, value_pair.second.get_obj(), errorStr, sev, checkCoverage, vpath, upath))
|
||||
ok = false;
|
||||
}
|
||||
} else {
|
||||
// The schema entry isn't an operator, so it asserts a type and (depending on the type) recursive schema definition
|
||||
if (normJSONType(sv.type()) != normJSONType(rv.type())) {
|
||||
errorStr += format("ERROR: Incorrect value type for key `%s'\n", kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
|
||||
ok = false;
|
||||
continue;
|
||||
}
|
||||
if (rv.type() == json_spirit::array_type) {
|
||||
auto& value_array = rv.get_array();
|
||||
auto& schema_array = sv.get_array();
|
||||
if (!schema_array.size()) {
|
||||
// An empty schema array means that the value array is required to be empty
|
||||
if (value_array.size()) {
|
||||
errorStr += format("ERROR: Expected an empty array for key `%s'\n", kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaSize", schema_array.size()).detail("ValueSize", value_array.size());
|
||||
if(sv.get_obj().at("$map").type() != json_spirit::obj_type) {
|
||||
continue;
|
||||
}
|
||||
auto& schemaVal = sv.get_obj().at("$map");
|
||||
auto& valueObj = rv.get_obj();
|
||||
|
||||
if(checkCoverage) {
|
||||
schemaCoverage(spath + ".$map");
|
||||
}
|
||||
|
||||
for(auto& valuePair : valueObj) {
|
||||
auto vpath = kpath + "[" + valuePair.first + "]";
|
||||
auto upath = spath + ".$map";
|
||||
if (valuePair.second.type() != json_spirit::obj_type) {
|
||||
errorStr += format("ERROR: Expected an object for `%s'\n", vpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", vpath).detail("ValueType", valuePair.second.type());
|
||||
ok = false;
|
||||
continue;
|
||||
}
|
||||
} else if (schema_array.size() == 1 && schema_array[0].type() == json_spirit::obj_type) {
|
||||
// A one item schema array means that all items in the value must match the first item in the schema
|
||||
auto& schema_obj = schema_array[0].get_obj();
|
||||
int index = 0;
|
||||
for(auto &value_item : value_array) {
|
||||
if (value_item.type() != json_spirit::obj_type) {
|
||||
errorStr += format("ERROR: Expected all array elements to be objects for key `%s'\n", kpath.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath + format("[%d]",index)).detail("ValueType", value_item.type());
|
||||
ok = false;
|
||||
continue;
|
||||
}
|
||||
if (!schemaMatch(schema_obj, value_item.get_obj(), errorStr, sev, checkCoverage, kpath + format("[%d]", index), spath + "[0]"))
|
||||
ok = false;
|
||||
index++;
|
||||
if(!schemaMatch(schemaVal, valuePair.second, errorStr, sev, checkCoverage, vpath, upath)) {
|
||||
ok = false;
|
||||
}
|
||||
} else
|
||||
ASSERT(false); // Schema doesn't make sense
|
||||
} else if (rv.type() == json_spirit::obj_type) {
|
||||
auto& schema_obj = sv.get_obj();
|
||||
auto& value_obj = rv.get_obj();
|
||||
if (!schemaMatch(schema_obj, value_obj, errorStr, sev, checkCoverage, kpath, spath))
|
||||
}
|
||||
} else {
|
||||
if(!schemaMatch(sv, rv, errorStr, sev, checkCoverage, kpath, spath)) {
|
||||
ok = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if(resultValue.type() == json_spirit::array_type) {
|
||||
auto& valueArray = resultValue.get_array();
|
||||
auto& schemaArray = schemaValue.get_array();
|
||||
if(!schemaArray.size()) {
|
||||
// An empty schema array means that the value array is required to be empty
|
||||
if(valueArray.size()) {
|
||||
errorStr += format("ERROR: Expected an empty array for key `%s'\n", path.c_str());
|
||||
TraceEvent(sev, "SchemaMismatch").detail("Path", path).detail("SchemaSize", schemaArray.size()).detail("ValueSize", valueArray.size());
|
||||
return false;
|
||||
}
|
||||
} else if(schemaArray.size() == 1) {
|
||||
// A one item schema array means that all items in the value must match the first item in the schema
|
||||
int index = 0;
|
||||
for(auto &valueItem : valueArray) {
|
||||
if(!schemaMatch(schemaArray[0], valueItem, errorStr, sev, checkCoverage, path + format("[%d]", index), schemaPath + "[0]")) {
|
||||
ok = false;
|
||||
}
|
||||
index++;
|
||||
}
|
||||
} else {
|
||||
ASSERT(false); // Schema doesn't make sense
|
||||
}
|
||||
}
|
||||
return ok;
|
||||
} catch (std::exception& e) {
|
||||
TraceEvent(SevError, "SchemaMatchException").detail("What", e.what()).detail("Path", path).detail("SchemaPath", schema_path);
|
||||
TraceEvent(SevError, "SchemaMatchException").detail("What", e.what()).detail("Path", path).detail("SchemaPath", schemaPath);
|
||||
throw unknown_error();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -177,6 +177,6 @@ Future<Void> waitForPrimaryDC( Database const& cx, StringRef const& dcId );
|
|||
Future<std::vector<NetworkAddress>> getCoordinators( Database const& cx );
|
||||
|
||||
void schemaCoverage( std::string const& spath, bool covered=true );
|
||||
bool schemaMatch( StatusObject const schema, StatusObject const result, std::string& errorStr, Severity sev=SevError, bool checkCoverage=false, std::string path = std::string(), std::string schema_path = std::string() );
|
||||
bool schemaMatch( json_spirit::mValue const& schema, json_spirit::mValue const& result, std::string& errorStr, Severity sev=SevError, bool checkCoverage=false, std::string path = std::string(), std::string schema_path = std::string() );
|
||||
|
||||
#endif
|
||||
|
|
|
@ -124,6 +124,27 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"hz":0.0,
|
||||
"counter":0,
|
||||
"roughness":0.0
|
||||
},
|
||||
"grv_latency_bands":{
|
||||
"0.01": {
|
||||
"hz":0.0,
|
||||
"counter":0,
|
||||
"roughness":0.0
|
||||
}
|
||||
},
|
||||
"read_latency_bands":{
|
||||
"0.01": {
|
||||
"hz":0.0,
|
||||
"counter":0,
|
||||
"roughness":0.0
|
||||
}
|
||||
},
|
||||
"commit_latency_bands":{
|
||||
"0.01": {
|
||||
"hz":0.0,
|
||||
"counter":0,
|
||||
"roughness":0.0
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
|
@ -603,7 +624,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
}
|
||||
})statusSchema");
|
||||
|
||||
const KeyRef JSONSchemas::configurationSchema = LiteralStringRef(R"configSchema(
|
||||
const KeyRef JSONSchemas::clusterConfigurationSchema = LiteralStringRef(R"configSchema(
|
||||
{
|
||||
"create":{
|
||||
"$enum":[
|
||||
|
@ -670,3 +691,25 @@ const KeyRef JSONSchemas::configurationSchema = LiteralStringRef(R"configSchema(
|
|||
"auto_logs":3,
|
||||
"proxies":5
|
||||
})configSchema");
|
||||
|
||||
const KeyRef JSONSchemas::latencyBandConfigurationSchema = LiteralStringRef(R"configSchema(
|
||||
{
|
||||
"get_read_version":{
|
||||
"bands":[
|
||||
0.0
|
||||
]
|
||||
},
|
||||
"read":{
|
||||
"bands":[
|
||||
0.0
|
||||
],
|
||||
"max_key_selector_offset":0,
|
||||
"max_read_bytes":0
|
||||
},
|
||||
"commit":{
|
||||
"bands":[
|
||||
0.0
|
||||
],
|
||||
"max_commit_bytes":0
|
||||
}
|
||||
})configSchema");
|
||||
|
|
|
@ -28,7 +28,8 @@
|
|||
|
||||
struct JSONSchemas {
|
||||
static const KeyRef statusSchema;
|
||||
static const KeyRef configurationSchema;
|
||||
static const KeyRef clusterConfigurationSchema;
|
||||
static const KeyRef latencyBandConfigurationSchema;
|
||||
};
|
||||
|
||||
#endif /* FDBCLIENT_SCHEMAS_H */
|
||||
|
|
|
@ -434,6 +434,9 @@ const KeyRangeRef fdbClientInfoPrefixRange(LiteralStringRef("\xff\x02/fdbClientI
|
|||
const KeyRef fdbClientInfoTxnSampleRate = LiteralStringRef("\xff\x02/fdbClientInfo/client_txn_sample_rate/");
|
||||
const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("\xff\x02/fdbClientInfo/client_txn_size_limit/");
|
||||
|
||||
// Request latency measurement key
|
||||
const KeyRef latencyBandConfigKey = LiteralStringRef("\xff\x02/latencyBandConfig");
|
||||
|
||||
// Keyspace to maintain wall clock to version map
|
||||
const KeyRangeRef timeKeeperPrefixRange(LiteralStringRef("\xff\x02/timeKeeper/map/"), LiteralStringRef("\xff\x02/timeKeeper/map0"));
|
||||
const KeyRef timeKeeperVersionKey = LiteralStringRef("\xff\x02/timeKeeper/version");
|
||||
|
|
|
@ -212,6 +212,9 @@ extern const KeyRangeRef fdbClientInfoPrefixRange;
|
|||
extern const KeyRef fdbClientInfoTxnSampleRate;
|
||||
extern const KeyRef fdbClientInfoTxnSizeLimit;
|
||||
|
||||
// Request latency measurement key
|
||||
extern const KeyRef latencyBandConfigKey;
|
||||
|
||||
// Keyspace to maintain wall clock to version map
|
||||
extern const KeyRangeRef timeKeeperPrefixRange;
|
||||
extern const KeyRef timeKeeperVersionKey;
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "ClusterRecruitmentInterface.h"
|
||||
#include "ServerDBInfo.h"
|
||||
#include "Status.h"
|
||||
#include "fdbserver/LatencyBandConfig.h"
|
||||
#include <algorithm>
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "RecoveryState.h"
|
||||
|
@ -1960,6 +1961,42 @@ ACTOR Future<Void> monitorProcessClasses(ClusterControllerData *self) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorServerInfoConfig(ClusterControllerData::DBInfo* db) {
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(db->db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
|
||||
Optional<Value> configVal = wait(tr.get(latencyBandConfigKey));
|
||||
Optional<LatencyBandConfig> config;
|
||||
if(configVal.present()) {
|
||||
config = LatencyBandConfig::parse(configVal.get());
|
||||
}
|
||||
|
||||
ServerDBInfo serverInfo = db->serverInfo->get();
|
||||
if(config != serverInfo.latencyBandConfig) {
|
||||
TraceEvent("LatencyBandConfigChanged").detail("Present", config.present());
|
||||
serverInfo.id = g_random->randomUniqueID();
|
||||
serverInfo.latencyBandConfig = config;
|
||||
db->serverInfo->set(serverInfo);
|
||||
}
|
||||
|
||||
state Future<Void> configChangeFuture = tr.watch(latencyBandConfigKey);
|
||||
Void _ = wait(tr.commit());
|
||||
Void _ = wait(configChangeFuture);
|
||||
|
||||
break;
|
||||
}
|
||||
catch (Error &e) {
|
||||
Void _ = wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> monitorClientTxnInfoConfigs(ClusterControllerData::DBInfo* db) {
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(db->db);
|
||||
|
@ -2183,6 +2220,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));
|
||||
addActor.send( timeKeeper(&self) );
|
||||
addActor.send( monitorProcessClasses(&self) );
|
||||
addActor.send( monitorServerInfoConfig(&self.db) );
|
||||
addActor.send( monitorClientTxnInfoConfigs(&self.db) );
|
||||
addActor.send( updatedChangingDatacenters(&self) );
|
||||
addActor.send( updatedChangedDatacenters(&self) );
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* LatencyBandConfig.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/LatencyBandConfig.h"
|
||||
|
||||
#include "fdbclient/ManagementAPI.h"
|
||||
#include "fdbclient/Schemas.h"
|
||||
|
||||
bool operator==(LatencyBandConfig::RequestConfig const& lhs, LatencyBandConfig::RequestConfig const& rhs) {
|
||||
return typeid(lhs) == typeid(rhs) && lhs.isEqual(rhs);
|
||||
}
|
||||
|
||||
bool operator!=(LatencyBandConfig::RequestConfig const& lhs, LatencyBandConfig::RequestConfig const& rhs) {
|
||||
return !(lhs == rhs);
|
||||
}
|
||||
|
||||
bool LatencyBandConfig::RequestConfig::isEqual(RequestConfig const& r) const {
|
||||
return bands == r.bands;
|
||||
};
|
||||
|
||||
void LatencyBandConfig::RequestConfig::fromJson(JSONDoc json) {
|
||||
json_spirit::mArray bandsArray;
|
||||
if(json.get("bands", bandsArray)) {
|
||||
for(auto b : bandsArray) {
|
||||
bands.insert(b.get_real());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void LatencyBandConfig::ReadConfig::fromJson(JSONDoc json) {
|
||||
RequestConfig::fromJson(json);
|
||||
|
||||
int value;
|
||||
if(json.get("max_read_bytes", value)) {
|
||||
maxReadBytes = value;
|
||||
}
|
||||
if(json.get("max_key_selector_offset", value)) {
|
||||
maxKeySelectorOffset = value;
|
||||
}
|
||||
}
|
||||
|
||||
bool LatencyBandConfig::ReadConfig::isEqual(RequestConfig const& r) const {
|
||||
ReadConfig const& other = static_cast<ReadConfig const&>(r);
|
||||
return RequestConfig::isEqual(r) && maxReadBytes == other.maxReadBytes && maxKeySelectorOffset == other.maxKeySelectorOffset;
|
||||
}
|
||||
|
||||
void LatencyBandConfig::CommitConfig::fromJson(JSONDoc json) {
|
||||
RequestConfig::fromJson(json);
|
||||
|
||||
int value;
|
||||
if(json.get("max_commit_bytes", value)) {
|
||||
maxCommitBytes = value;
|
||||
}
|
||||
}
|
||||
|
||||
bool LatencyBandConfig::CommitConfig::isEqual(RequestConfig const& r) const {
|
||||
CommitConfig const& other = static_cast<CommitConfig const&>(r);
|
||||
return RequestConfig::isEqual(r) && maxCommitBytes == other.maxCommitBytes;
|
||||
}
|
||||
|
||||
Optional<LatencyBandConfig> LatencyBandConfig::parse(ValueRef configurationString) {
|
||||
Optional<LatencyBandConfig> config;
|
||||
if(configurationString.size() == 0) {
|
||||
return config;
|
||||
}
|
||||
|
||||
json_spirit::mValue parsedConfig;
|
||||
if(!json_spirit::read_string(configurationString.toString(), parsedConfig)) {
|
||||
TraceEvent(SevWarnAlways, "InvalidLatencyBandConfiguration").detail("Reason", "InvalidJSON").detail("Configuration", printable(configurationString));
|
||||
return config;
|
||||
}
|
||||
|
||||
json_spirit::mObject configJson = parsedConfig.get_obj();
|
||||
|
||||
json_spirit::mValue schema;
|
||||
if(!json_spirit::read_string(JSONSchemas::latencyBandConfigurationSchema.toString(), schema)) {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
std::string errorStr;
|
||||
if(!schemaMatch(schema.get_obj(), configJson, errorStr)) {
|
||||
TraceEvent(SevWarnAlways, "InvalidLatencyBandConfiguration").detail("Reason", "SchemaMismatch").detail("Configuration", printable(configurationString)).detail("Error", errorStr);
|
||||
return config;
|
||||
}
|
||||
|
||||
JSONDoc configDoc(configJson);
|
||||
|
||||
config = LatencyBandConfig();
|
||||
|
||||
config.get().grvConfig.fromJson(configDoc.subDoc("get_read_version"));
|
||||
config.get().readConfig.fromJson(configDoc.subDoc("read"));
|
||||
config.get().commitConfig.fromJson(configDoc.subDoc("commit"));
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
bool LatencyBandConfig::operator==(LatencyBandConfig const& r) const {
|
||||
return grvConfig == r.grvConfig && readConfig == r.readConfig && commitConfig == r.commitConfig;
|
||||
}
|
||||
|
||||
bool LatencyBandConfig::operator!=(LatencyBandConfig const& r) const {
|
||||
return !(*this == r);
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* LatencyBandConfig.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2019 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_LATENCYBANDCONFIG_H
|
||||
#define FDBSERVER_LATENCYBANDCONFIG_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/JSONDoc.h"
|
||||
|
||||
struct LatencyBandConfig {
|
||||
struct RequestConfig {
|
||||
std::set<double> bands;
|
||||
|
||||
friend bool operator==(RequestConfig const& lhs, RequestConfig const& rhs);
|
||||
friend bool operator!=(RequestConfig const& lhs, RequestConfig const& rhs);
|
||||
|
||||
virtual void fromJson(JSONDoc json);
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
uint64_t bandsSize = (uint64_t)bands.size();
|
||||
ar & bandsSize;
|
||||
|
||||
if(ar.isDeserializing) {
|
||||
double band;
|
||||
for(uint64_t i = 0; i < bandsSize; i++) {
|
||||
ar & band;
|
||||
bands.insert(band);
|
||||
}
|
||||
}
|
||||
else {
|
||||
for(double band : bands) {
|
||||
ar & band;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual bool isEqual(RequestConfig const& r) const;
|
||||
};
|
||||
|
||||
struct GrvConfig : RequestConfig {};
|
||||
|
||||
struct ReadConfig : RequestConfig {
|
||||
Optional<int> maxReadBytes;
|
||||
Optional<int> maxKeySelectorOffset;
|
||||
|
||||
virtual void fromJson(JSONDoc json);
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
ar & *(RequestConfig*)this & maxReadBytes & maxKeySelectorOffset;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual bool isEqual(RequestConfig const& r) const;
|
||||
};
|
||||
|
||||
struct CommitConfig : RequestConfig {
|
||||
Optional<int> maxCommitBytes;
|
||||
|
||||
virtual void fromJson(JSONDoc json);
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
ar & *(RequestConfig*)this & maxCommitBytes;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual bool isEqual(RequestConfig const& r) const;
|
||||
};
|
||||
|
||||
GrvConfig grvConfig;
|
||||
ReadConfig readConfig;
|
||||
CommitConfig commitConfig;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
ar & grvConfig & readConfig & commitConfig;
|
||||
}
|
||||
|
||||
static Optional<LatencyBandConfig> parse(ValueRef configurationString);
|
||||
|
||||
bool operator==(LatencyBandConfig const& r) const;
|
||||
bool operator!=(LatencyBandConfig const& r) const;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -38,6 +38,7 @@
|
|||
#include "flow/Stats.h"
|
||||
#include "ApplyMetadataMutation.h"
|
||||
#include "RecoveryState.h"
|
||||
#include "fdbserver/LatencyBandConfig.h"
|
||||
#include "fdbclient/Atomic.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
|
||||
|
@ -72,16 +73,6 @@ struct ProxyStats {
|
|||
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
|
||||
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
|
||||
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
|
||||
|
||||
commitLatencyBands.addThreshold(0.001);
|
||||
commitLatencyBands.addThreshold(0.01);
|
||||
commitLatencyBands.addThreshold(0.1);
|
||||
commitLatencyBands.addThreshold(1);
|
||||
|
||||
grvLatencyBands.addThreshold(0.001);
|
||||
grvLatencyBands.addThreshold(0.01);
|
||||
grvLatencyBands.addThreshold(0.1);
|
||||
grvLatencyBands.addThreshold(1);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -222,6 +213,8 @@ struct ProxyCommitData {
|
|||
Version lastTxsPop;
|
||||
bool popRemoteTxs;
|
||||
|
||||
Optional<LatencyBandConfig> latencyBandConfig;
|
||||
|
||||
//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
|
||||
//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
|
||||
//We do not repopulate them immediately to avoid a slow task.
|
||||
|
@ -982,7 +975,10 @@ ACTOR Future<Void> commitBatch(
|
|||
}
|
||||
|
||||
// TODO: filter if pipelined with large commit
|
||||
self->stats.commitLatencyBands.addMeasurement(endTime - trs[t].requestTime, maxTransactionBytes > 1e6);
|
||||
if(self->latencyBandConfig.present()) {
|
||||
bool filter = maxTransactionBytes > self->latencyBandConfig.get().commitConfig.maxCommitBytes.orDefault(std::numeric_limits<int>::max());
|
||||
self->stats.commitLatencyBands.addMeasurement(endTime - trs[t].requestTime, filter);
|
||||
}
|
||||
}
|
||||
|
||||
++self->stats.commitBatchOut;
|
||||
|
@ -1439,6 +1435,34 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
}
|
||||
commitData.logSystem->pop(commitData.lastTxsPop, txsTag, 0, tagLocalityRemoteLog);
|
||||
}
|
||||
|
||||
Optional<LatencyBandConfig> newLatencyBandConfig = db->get().latencyBandConfig;
|
||||
|
||||
if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
|
||||
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().grvConfig != commitData.latencyBandConfig.get().grvConfig))
|
||||
{
|
||||
TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
|
||||
commitData.stats.grvLatencyBands.clearBands();
|
||||
if(newLatencyBandConfig.present()) {
|
||||
for(auto band : newLatencyBandConfig.get().grvConfig.bands) {
|
||||
commitData.stats.grvLatencyBands.addThreshold(band);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(newLatencyBandConfig.present() != commitData.latencyBandConfig.present()
|
||||
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().commitConfig != commitData.latencyBandConfig.get().commitConfig))
|
||||
{
|
||||
TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
|
||||
commitData.stats.commitLatencyBands.clearBands();
|
||||
if(newLatencyBandConfig.present()) {
|
||||
for(auto band : newLatencyBandConfig.get().commitConfig.bands) {
|
||||
commitData.stats.commitLatencyBands.addThreshold(band);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
commitData.latencyBandConfig = newLatencyBandConfig;
|
||||
}
|
||||
when(Void _ = wait(onError)) {}
|
||||
when(std::pair<vector<CommitTransactionRequest>, int> batchedRequests = waitNext(batchedCommits.getFuture())) {
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "MasterInterface.h"
|
||||
#include "LogSystemConfig.h"
|
||||
#include "RecoveryState.h"
|
||||
#include "LatencyBandConfig.h"
|
||||
|
||||
struct ServerDBInfo {
|
||||
// This structure contains transient information which is broadcast to all workers for a database,
|
||||
|
@ -44,6 +45,7 @@ struct ServerDBInfo {
|
|||
LocalityData myLocality; // (Not serialized) Locality information, if available, for the *local* process
|
||||
LogSystemConfig logSystemConfig;
|
||||
std::vector<UID> priorCommittedLogServers; // If !fullyRecovered and logSystemConfig refers to a new log system which may not have been committed to the coordinated state yet, then priorCommittedLogServers are the previous, fully committed generation which need to stay alive in case this recovery fails
|
||||
Optional<LatencyBandConfig> latencyBandConfig;
|
||||
|
||||
ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED) {}
|
||||
explicit ServerDBInfo(StringRef const& dbName) : dbName(dbName), recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED) {}
|
||||
|
@ -53,7 +55,7 @@ struct ServerDBInfo {
|
|||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & id & clusterInterface & client & master & resolvers & dbName & recoveryCount & masterLifetime & logSystemConfig & priorCommittedLogServers & recoveryState;
|
||||
ar & id & clusterInterface & client & master & resolvers & dbName & recoveryCount & masterLifetime & logSystemConfig & priorCommittedLogServers & recoveryState & latencyBandConfig;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -434,26 +434,27 @@ struct RolesInfo {
|
|||
obj["mutation_bytes"] = StatusCounter(metrics.getValue("MutationBytes")).getStatus();
|
||||
obj["mutations"] = StatusCounter(metrics.getValue("Mutations")).getStatus();
|
||||
|
||||
std::string regularPrefix = "ReadLatency";
|
||||
std::string filteredPrefix = "FilteredReadLatency";
|
||||
std::string latencyBandPrefix = "ReadLatency";
|
||||
|
||||
JsonBuilderObject latency;
|
||||
std::map<std::string, JsonBuilderObject> bands;
|
||||
|
||||
bool found = false;
|
||||
for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) {
|
||||
bool regularMeasurement = itr->first.substr(0, regularPrefix.size()) == regularPrefix;
|
||||
if(!regularMeasurement && itr->first.substr(0, filteredPrefix.size()) != filteredPrefix) {
|
||||
continue;
|
||||
}
|
||||
if(itr->first.substr(0, latencyBandPrefix.size()) == latencyBandPrefix) {
|
||||
found = true;
|
||||
std::string band = itr->first.substr(latencyBandPrefix.size());
|
||||
latency[band] = StatusCounter(itr->second).getCounter();
|
||||
}
|
||||
|
||||
std::string band = itr->first.substr(regularMeasurement ? regularPrefix.size() : filteredPrefix.size());
|
||||
//bands[band][regularMeasurement ? "counted" : "filtered"] = StatusCounter(itr->second).getCounter();
|
||||
latency[band] = StatusCounter(itr->second).getCounter();
|
||||
std::string value;
|
||||
if(metrics.tryGetValue("Filtered" + latencyBandPrefix, value)) {
|
||||
latency["filtered"] = StatusCounter(value).getCounter();
|
||||
}
|
||||
}
|
||||
if(found) {
|
||||
obj["read_latency_bands"] = latency;
|
||||
}
|
||||
/*for(auto itr : bands) {
|
||||
latency[itr.first] = itr.second;
|
||||
}*/
|
||||
obj["read_latency_bands"] = latency;
|
||||
|
||||
Version version = parseInt64(metrics.getValue("Version"));
|
||||
Version durableVersion = parseInt64(metrics.getValue("DurableVersion"));
|
||||
|
@ -531,19 +532,37 @@ struct RolesInfo {
|
|||
JsonBuilderObject grvLatency;
|
||||
JsonBuilderObject commitLatency;
|
||||
|
||||
bool grvFound = false;
|
||||
bool commitFound = false;
|
||||
for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) {
|
||||
if(itr->first.substr(0, grvPrefix.size()) == grvPrefix) {
|
||||
grvFound = true;
|
||||
std::string band = itr->first.substr(grvPrefix.size());
|
||||
grvLatency[band] = StatusCounter(itr->second).getCounter();
|
||||
}
|
||||
else if(itr->first.substr(0, commitPrefix.size()) == commitPrefix) {
|
||||
commitFound = true;
|
||||
std::string band = itr->first.substr(commitPrefix.size());
|
||||
commitLatency[band] = StatusCounter(itr->second).getCounter();
|
||||
}
|
||||
}
|
||||
|
||||
obj["grv_latency_bands"] = grvLatency;
|
||||
obj["commit_latency_bands"] = commitLatency;
|
||||
if(grvFound) {
|
||||
std::string value;
|
||||
if(metrics.tryGetValue("Filtered" + grvPrefix, value)) {
|
||||
grvLatency["filtered"] = StatusCounter(value).getCounter();
|
||||
}
|
||||
|
||||
obj["grv_latency_bands"] = grvLatency;
|
||||
}
|
||||
if(commitFound) {
|
||||
std::string value;
|
||||
if(metrics.tryGetValue("Filtered" + commitPrefix, value)) {
|
||||
commitLatency["filtered"] = StatusCounter(value).getCounter();
|
||||
}
|
||||
|
||||
obj["commit_latency_bands"] = commitLatency;
|
||||
}
|
||||
} catch (Error &e) {
|
||||
if(e.code() != error_code_attribute_not_found) {
|
||||
throw e;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<Project DefaultTargets="Build" ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
<Import Project="$(SolutionDir)versions.target" />
|
||||
<PropertyGroup Condition="'$(Release)' != 'true' ">
|
||||
|
@ -53,6 +53,7 @@
|
|||
<ActorCompiler Include="LogSystemPeekCursor.actor.cpp" />
|
||||
<ActorCompiler Include="LogRouter.actor.cpp" />
|
||||
<ActorCompiler Include="OldTLogServer.actor.cpp" />
|
||||
<ClCompile Include="LatencyBandConfig.cpp" />
|
||||
<ClCompile Include="SkipList.cpp" />
|
||||
<ActorCompiler Include="WaitFailure.actor.cpp" />
|
||||
<ActorCompiler Include="tester.actor.cpp" />
|
||||
|
@ -154,6 +155,7 @@
|
|||
<ClInclude Include="DBCoreState.h" />
|
||||
<ClInclude Include="IDiskQueue.h" />
|
||||
<ClInclude Include="IKeyValueStore.h" />
|
||||
<ClInclude Include="LatencyBandConfig.h" />
|
||||
<ClInclude Include="LeaderElection.h" />
|
||||
<ClInclude Include="LogProtocolMessage.h" />
|
||||
<ClInclude Include="LogSystem.h" />
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
<ItemGroup>
|
||||
<ActorCompiler Include="ClusterController.actor.cpp" />
|
||||
|
@ -292,7 +292,6 @@
|
|||
<Filter>sqlite</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="VFSAsync.cpp" />
|
||||
<ClCompile Include="DatabaseConfiguration.cpp" />
|
||||
<ClCompile Include="workloads\AsyncFile.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ClCompile>
|
||||
|
@ -300,6 +299,7 @@
|
|||
<ClCompile Include="workloads\MemoryKeyValueStore.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="LatencyBandConfig.cpp" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ClInclude Include="ConflictSet.h" />
|
||||
|
@ -362,6 +362,7 @@
|
|||
<ClInclude Include="RecoveryState.h" />
|
||||
<ClInclude Include="LogProtocolMessage.h" />
|
||||
<ClInclude Include="template_fdb.h" />
|
||||
<ClInclude Include="LatencyBandConfig.h" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<Filter Include="workloads">
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
#include "LogSystem.h"
|
||||
#include "RecoveryState.h"
|
||||
#include "LogProtocolMessage.h"
|
||||
#include "fdbserver/LatencyBandConfig.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
|
||||
using std::make_pair;
|
||||
|
@ -55,9 +56,6 @@ using std::make_pair;
|
|||
|
||||
#define SHORT_CIRCUT_ACTUAL_STORAGE 0
|
||||
|
||||
int64_t MAX_RESULT_SIZE = 1e4;
|
||||
int64_t MAX_SELECTOR_OFFSET = 1e2;
|
||||
|
||||
struct StorageServer;
|
||||
class ValueOrClearToRef {
|
||||
public:
|
||||
|
@ -406,6 +404,8 @@ public:
|
|||
return val;
|
||||
}
|
||||
|
||||
Optional<LatencyBandConfig> latencyBandConfig;
|
||||
|
||||
struct Counters {
|
||||
CounterCollection cc;
|
||||
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries;
|
||||
|
@ -465,12 +465,6 @@ public:
|
|||
specialCounter(cc, "KvstoreBytesFree", [self](){ return self->storage.getStorageBytes().free; });
|
||||
specialCounter(cc, "KvstoreBytesAvailable", [self](){ return self->storage.getStorageBytes().available; });
|
||||
specialCounter(cc, "KvstoreBytesTotal", [self](){ return self->storage.getStorageBytes().total; });
|
||||
|
||||
readLatencyBands.addThreshold(0.0001);
|
||||
readLatencyBands.addThreshold(0.001);
|
||||
readLatencyBands.addThreshold(0.01);
|
||||
readLatencyBands.addThreshold(0.1);
|
||||
readLatencyBands.addThreshold(1);
|
||||
}
|
||||
} counters;
|
||||
|
||||
|
@ -790,7 +784,10 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
|
|||
|
||||
++data->counters.finishedQueries;
|
||||
--data->readQueueSizeMetric;
|
||||
data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > MAX_RESULT_SIZE);
|
||||
if(data->latencyBandConfig.present()) {
|
||||
int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
|
||||
data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes);
|
||||
}
|
||||
|
||||
return Void();
|
||||
};
|
||||
|
@ -1327,7 +1324,12 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
|||
|
||||
++data->counters.finishedQueries;
|
||||
--data->readQueueSizeMetric;
|
||||
data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > MAX_RESULT_SIZE || abs(req.begin.offset) > MAX_SELECTOR_OFFSET || abs(req.end.offset) > MAX_SELECTOR_OFFSET);
|
||||
|
||||
if(data->latencyBandConfig.present()) {
|
||||
int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
|
||||
int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
|
||||
data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes || abs(req.begin.offset) > maxSelectorOffset || abs(req.end.offset) > maxSelectorOffset);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -1378,7 +1380,11 @@ ACTOR Future<Void> getKey( StorageServer* data, GetKeyRequest req ) {
|
|||
|
||||
++data->counters.finishedQueries;
|
||||
--data->readQueueSizeMetric;
|
||||
data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > MAX_RESULT_SIZE || abs(req.sel.offset > MAX_SELECTOR_OFFSET));
|
||||
if(data->latencyBandConfig.present()) {
|
||||
int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
|
||||
int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
|
||||
data->counters.readLatencyBands.addMeasurement(timer()-req.requestTime, resultSize > maxReadBytes || abs(req.sel.offset) > maxSelectorOffset);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -3303,6 +3309,20 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
doUpdate = Void();
|
||||
}
|
||||
}
|
||||
|
||||
Optional<LatencyBandConfig> newLatencyBandConfig = self->db->get().latencyBandConfig;
|
||||
if(newLatencyBandConfig.present() != self->latencyBandConfig.present()
|
||||
|| (newLatencyBandConfig.present() && newLatencyBandConfig.get().readConfig != self->latencyBandConfig.get().readConfig))
|
||||
{
|
||||
self->latencyBandConfig = newLatencyBandConfig;
|
||||
self->counters.readLatencyBands.clearBands();
|
||||
TraceEvent("LatencyBandReadUpdatingConfig").detail("Present", newLatencyBandConfig.present());
|
||||
if(self->latencyBandConfig.present()) {
|
||||
for(auto band : self->latencyBandConfig.get().readConfig.bands) {
|
||||
self->counters.readLatencyBands.addThreshold(band);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) {
|
||||
// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
|
||||
|
|
45
flow/Stats.h
45
flow/Stats.h
|
@ -121,39 +121,54 @@ static void specialCounter(CounterCollection& collection, std::string const& nam
|
|||
|
||||
class LatencyBands {
|
||||
public:
|
||||
LatencyBands(std::string name, CounterCollection &cc) : name(name), cc(cc) {
|
||||
addThreshold(std::numeric_limits<double>::infinity());
|
||||
}
|
||||
LatencyBands(std::string name, CounterCollection &cc) : name(name), cc(cc), filteredCount(nullptr) {}
|
||||
|
||||
void addThreshold(double value) {
|
||||
if(value > 0 && bands.count(value) == 0) {
|
||||
bands.insert(std::make_pair(value, new Counter(format("%s%f", name.c_str(), value), cc)));
|
||||
filteredBands.insert(std::make_pair(value, new Counter(format("Filtered%s%f", name.c_str(), value), cc)));
|
||||
if(bands.size() == 0) {
|
||||
filteredCount = new Counter(format("Filtered%s", name.c_str()), cc);
|
||||
insertBand(std::numeric_limits<double>::infinity());
|
||||
}
|
||||
|
||||
insertBand(value);
|
||||
}
|
||||
}
|
||||
|
||||
void addMeasurement(double measurement, bool filtered=false) {
|
||||
const auto &targetBands = filtered ? filteredBands : bands;
|
||||
auto itr = targetBands.upper_bound(measurement);
|
||||
if(itr == targetBands.end()) {
|
||||
fprintf(stderr, "Can't add measurement %lf\n", measurement);
|
||||
if(filtered && filteredCount) {
|
||||
++(*filteredCount);
|
||||
}
|
||||
ASSERT(itr != targetBands.end());
|
||||
++(*itr->second);
|
||||
else if(bands.size() > 0) {
|
||||
auto itr = bands.upper_bound(measurement);
|
||||
ASSERT(itr != bands.end());
|
||||
++(*itr->second);
|
||||
}
|
||||
}
|
||||
|
||||
void clearBands() {
|
||||
for(auto itr : bands) {
|
||||
delete itr.second;
|
||||
}
|
||||
|
||||
bands.clear();
|
||||
|
||||
delete filteredCount;
|
||||
}
|
||||
|
||||
~LatencyBands() {
|
||||
for(auto itr = bands.begin(); itr != bands.end(); ++itr) {
|
||||
delete itr->second;
|
||||
}
|
||||
clearBands();
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<double, Counter*> bands;
|
||||
std::map<double, Counter*> filteredBands;
|
||||
Counter *filteredCount;
|
||||
|
||||
std::string name;
|
||||
CounterCollection &cc;
|
||||
|
||||
void insertBand(double value) {
|
||||
bands.insert(std::make_pair(value, new Counter(format("%s%f", name.c_str(), value), cc)));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue