Merge pull request #713 from etschannen/feature-configure-from-file

Added the ability to configure a cluster from a JSON file
This commit is contained in:
A.J. Beamon 2018-09-05 10:28:13 -07:00 committed by GitHub
commit e37f90ff2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 959 additions and 162 deletions

View File

@ -27,6 +27,7 @@
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/ManagementAPI.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/FDBOptions.g.h"
@ -443,8 +444,12 @@ void initHelp() {
"All keys between BEGINKEY (inclusive) and ENDKEY (exclusive) are cleared from the database. This command will succeed even if the specified range is empty, but may fail because of conflicts." ESCAPINGK);
helpMap["configure"] = CommandHelp(
"configure [new] <single|double|triple|three_data_hall|three_datacenter|ssd|memory|proxies=<PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*",
"change database configuration",
"change the database configuration",
"The `new' option, if present, initializes a new database with the given configuration rather than changing the configuration of an existing one. When used, both a redundancy mode and a storage engine must be specified.\n\nRedundancy mode:\n single - one copy of the data. Not fault tolerant.\n double - two copies of data (survive one failure).\n triple - three copies of data (survive two failures).\n three_data_hall - See the Admin Guide.\n three_datacenter - See the Admin Guide.\n\nStorage engine:\n ssd - B-Tree storage engine optimized for solid state disks.\n memory - Durable in-memory storage engine for small datasets.\n\nproxies=<PROXIES>: Sets the desired number of proxies in the cluster. Must be at least 1, or set to -1 which restores the number of proxies to the default value.\n\nlogs=<LOGS>: Sets the desired number of log servers in the cluster. Must be at least 1, or set to -1 which restores the number of logs to the default value.\n\nresolvers=<RESOLVERS>: Sets the desired number of resolvers in the cluster. Must be at least 1, or set to -1 which restores the number of resolvers to the default value.\n\nSee the FoundationDB Administration Guide for more information.");
helpMap["fileconfigure"] = CommandHelp(
"fileconfigure [new] <FILENAME>",
"change the database configuration from a file",
"The `new' option, if present, initializes a new database with the given configuration rather than changing the configuration of an existing one. Load a JSON document from the provided file, and change the database configuration to match the contents of the JSON document. The format should be the same as the value of the \"configuration\" entry in status JSON without \"excluded_servers\" or \"coordinators_count\".");
helpMap["coordinators"] = CommandHelp(
"coordinators auto|<ADDRESS>+ [description=new_cluster_description]",
"change cluster coordinators or description",
@ -1573,9 +1578,96 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
case ConfigurationResult::UNKNOWN_OPTION:
case ConfigurationResult::INCOMPLETE_CONFIGURATION:
printUsage(tokens[0]);
ret = true;
ret=true;
break;
case ConfigurationResult::INVALID_CONFIGURATION:
printf("ERROR: These changes would make the configuration invalid\n");
ret=true;
break;
case ConfigurationResult::DATABASE_ALREADY_CREATED:
printf("ERROR: Database already exists! To change configuration, don't say `new'\n");
ret=true;
break;
case ConfigurationResult::DATABASE_CREATED:
printf("Database created\n");
ret=false;
break;
case ConfigurationResult::SUCCESS:
printf("Configuration changed\n");
ret=false;
break;
default:
ASSERT(false);
ret=true;
};
return ret;
}
ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDatabase) {
std::string contents(readFileBytes(filePath, 100000));
json_spirit::mValue config;
if(!json_spirit::read_string( contents, config )) {
printf("ERROR: Invalid JSON\n");
return true;
}
StatusObject configJSON = config.get_obj();
json_spirit::mValue schema;
if(!json_spirit::read_string( JSONSchemas::configurationSchema.toString(), schema )) {
ASSERT(false);
}
std::string errorStr;
if( !schemaMatch(schema.get_obj(), configJSON, errorStr) ) {
printf("%s", errorStr.c_str());
return true;
}
std::string configString;
if(isNewDatabase) {
configString = "new";
}
for(auto kv : configJSON) {
if(!configString.empty()) {
configString += " ";
}
if( kv.second.type() == json_spirit::int_type ) {
configString += kv.first + ":=" + format("%d", kv.second.get_int());
} else if( kv.second.type() == json_spirit::str_type ) {
configString += kv.second.get_str();
} else if( kv.second.type() == json_spirit::array_type ) {
configString += kv.first + "=" + json_spirit::write_string(json_spirit::mValue(kv.second.get_array()), json_spirit::Output_options::none);
} else {
printUsage(LiteralStringRef("fileconfigure"));
return true;
}
}
ConfigurationResult::Type result = wait( makeInterruptable( changeConfig(db, configString) ) );
// Real errors get thrown from makeInterruptable and printed by the catch block in cli(), but
// there are various results specific to changeConfig() that we need to report:
bool ret;
switch(result) {
case ConfigurationResult::NO_OPTIONS_PROVIDED:
printf("ERROR: No options provided\n");
ret=true;
break;
case ConfigurationResult::CONFLICTING_OPTIONS:
printf("ERROR: Conflicting options\n");
ret=true;
break;
case ConfigurationResult::UNKNOWN_OPTION:
printf("ERROR: Unknown option\n"); //This should not be possible because of schema match
ret=true;
break;
case ConfigurationResult::INCOMPLETE_CONFIGURATION:
printf("ERROR: Must specify both a replication level and a storage engine when creating a new database\n");
ret=true;
break;
case ConfigurationResult::INVALID_CONFIGURATION:
printf("ERROR: These changes would make the configuration invalid\n");
ret=true;
break;
case ConfigurationResult::DATABASE_ALREADY_CREATED:
printf("ERROR: Database already exists! To change configuration, don't say `new'\n");
ret=true;
@ -2457,6 +2549,17 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "fileconfigure")) {
if (tokens.size() == 2 || (tokens.size() == 3 && tokens[1] == LiteralStringRef("new"))) {
bool err = wait( fileConfigure( db, tokens.back().toString(), tokens.size() == 3 ) );
if (err) is_error = true;
} else {
printUsage(tokens[0]);
is_error = true;
}
continue;
}
if (tokencmp(tokens[0], "coordinators")) {
auto cs = ClusterConnectionFile( ccf->getFilename() ).getConnectionString();
if (tokens.size() < 2) {

View File

@ -250,6 +250,23 @@ bool isCompleteConfiguration( std::map<std::string, std::string> const& options
options.count( p+"storage_engine" ) == 1;
}
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration( Database cx ) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Standalone<RangeResultRef> res = wait( tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY) );
ASSERT( res.size() < CLIENT_KNOBS->TOO_MANY );
DatabaseConfiguration config;
config.fromKeyValues((VectorRef<KeyValueRef>) res);
return config;
} catch( Error &e ) {
Void _ = wait( tr.onError(e) );
}
}
}
ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std::string, std::string> m ) {
state StringRef initIdKey = LiteralStringRef( "\xff/init_id" );
state Transaction tr(cx);
@ -264,6 +281,19 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std:
m[initIdKey.toString()] = g_random->randomUniqueID().toString();
if (!isCompleteConfiguration(m))
return ConfigurationResult::INCOMPLETE_CONFIGURATION;
} else {
state Future<DatabaseConfiguration> fConfig = getDatabaseConfiguration(cx);
Void _ = wait( success(fConfig) || delay(1.0) );
if(fConfig.isReady()) {
DatabaseConfiguration config = fConfig.get();
for(auto kv : m) {
config.set(kv.first, kv.second);
}
if(!config.isValid()) {
return ConfigurationResult::INVALID_CONFIGURATION;
}
}
}
loop {
@ -1172,23 +1202,6 @@ ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion>
}
}
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration( Database cx ) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<RangeResultRef> res = wait( tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY) );
ASSERT( res.size() < CLIENT_KNOBS->TOO_MANY );
DatabaseConfiguration config;
config.fromKeyValues((VectorRef<KeyValueRef>) res);
return config;
} catch( Error &e ) {
Void _ = wait( tr.onError(e) );
}
}
}
ACTOR Future<Void> waitForFullReplication( Database cx ) {
state ReadYourWritesTransaction tr(cx);
loop {
@ -1385,6 +1398,140 @@ ACTOR Future<Void> forceRecovery (Reference<ClusterConnectionFile> clusterFile)
}
}
json_spirit::Value_type normJSONType(json_spirit::Value_type type) {
if (type == json_spirit::int_type)
return json_spirit::real_type;
return type;
}
void schemaCoverage( std::string const& spath, bool covered ) {
static std::set<std::string> coveredSchemaPaths;
if (coveredSchemaPaths.insert(spath).second) {
TraceEvent ev(SevInfo, "CodeCoverage");
ev.detail("File", "documentation/StatusSchema.json/" + spath).detail("Line", 0);
if (!covered)
ev.detail("Covered", 0);
}
}
bool schemaMatch( StatusObject const schema, StatusObject const result, std::string& errorStr, Severity sev, bool checkCoverage, std::string path, std::string schema_path ) {
// Returns true if everything in `result` is permitted by `schema`
// Really this should recurse on "values" rather than "objects"?
bool ok = true;
try {
for(auto& rkv : result) {
auto& key = rkv.first;
auto& rv = rkv.second;
std::string kpath = path + "." + key;
std::string spath = schema_path + "." + key;
if(checkCoverage) schemaCoverage(spath);
if (!schema.count(key)) {
errorStr += format("ERROR: Unknown key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaPath", spath);
ok = false;
continue;
}
auto& sv = schema.at(key);
if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$enum")) {
auto& enum_values = sv.get_obj().at("$enum").get_array();
bool any_match = false;
for(auto& enum_item : enum_values)
if (enum_item == rv) {
any_match = true;
if(checkCoverage) schemaCoverage(spath + ".$enum." + enum_item.get_str());
break;
}
if (!any_match) {
errorStr += format("ERROR: Unknown value `%s' for key `%s'\n", json_spirit::write_string(rv).c_str(), kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaEnumItems", enum_values.size()).detail("Value", json_spirit::write_string(rv));
if(checkCoverage) schemaCoverage(spath + ".$enum." + json_spirit::write_string(rv));
ok = false;
}
} else if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$map")) {
if (rv.type() != json_spirit::obj_type) {
errorStr += format("ERROR: Expected an object as the value for key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
ok = false;
continue;
}
if(sv.get_obj().at("$map").type() != json_spirit::obj_type) {
continue;
}
auto& schema_obj = sv.get_obj().at("$map").get_obj();
auto& value_obj = rv.get_obj();
if(checkCoverage) schemaCoverage(spath + ".$map");
for(auto& value_pair : value_obj) {
auto vpath = kpath + "[" + value_pair.first + "]";
auto upath = spath + ".$map";
if (value_pair.second.type() != json_spirit::obj_type) {
errorStr += format("ERROR: Expected an object for `%s'\n", vpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", vpath).detail("ValueType", value_pair.second.type());
ok = false;
continue;
}
if (!schemaMatch(schema_obj, value_pair.second.get_obj(), errorStr, sev, checkCoverage, vpath, upath))
ok = false;
}
} else {
// The schema entry isn't an operator, so it asserts a type and (depending on the type) recursive schema definition
if (normJSONType(sv.type()) != normJSONType(rv.type())) {
errorStr += format("ERROR: Incorrect value type for key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
ok = false;
continue;
}
if (rv.type() == json_spirit::array_type) {
auto& value_array = rv.get_array();
auto& schema_array = sv.get_array();
if (!schema_array.size()) {
// An empty schema array means that the value array is required to be empty
if (value_array.size()) {
errorStr += format("ERROR: Expected an empty array for key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaSize", schema_array.size()).detail("ValueSize", value_array.size());
ok = false;
continue;
}
} else if (schema_array.size() == 1 && schema_array[0].type() == json_spirit::obj_type) {
// A one item schema array means that all items in the value must match the first item in the schema
auto& schema_obj = schema_array[0].get_obj();
int index = 0;
for(auto &value_item : value_array) {
if (value_item.type() != json_spirit::obj_type) {
errorStr += format("ERROR: Expected all array elements to be objects for key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath + format("[%d]",index)).detail("ValueType", value_item.type());
ok = false;
continue;
}
if (!schemaMatch(schema_obj, value_item.get_obj(), errorStr, sev, checkCoverage, kpath + format("[%d]", index), spath + "[0]"))
ok = false;
index++;
}
} else
ASSERT(false); // Schema doesn't make sense
} else if (rv.type() == json_spirit::obj_type) {
auto& schema_obj = sv.get_obj();
auto& value_obj = rv.get_obj();
if (!schemaMatch(schema_obj, value_obj, errorStr, sev, checkCoverage, kpath, spath))
ok = false;
}
}
}
return ok;
} catch (std::exception& e) {
TraceEvent(SevError, "SchemaMatchException").detail("What", e.what()).detail("Path", path).detail("SchemaPath", schema_path);
throw unknown_error();
}
}
TEST_CASE("ManagementAPI/AutoQuorumChange/checkLocality") {
Void _ = wait(Future<Void>(Void()));

View File

@ -46,6 +46,7 @@ public:
CONFLICTING_OPTIONS,
UNKNOWN_OPTION,
INCOMPLETE_CONFIGURATION,
INVALID_CONFIGURATION,
DATABASE_ALREADY_CREATED,
DATABASE_CREATED,
SUCCESS
@ -166,4 +167,8 @@ Future<Void> forceRecovery (Reference<ClusterConnectionFile> const& clusterFile)
// Gets the cluster connection string
Future<std::vector<NetworkAddress>> getCoordinators( Database const& cx );
void schemaCoverage( std::string const& spath, bool covered=true );
bool schemaMatch( StatusObject const schema, StatusObject const result, std::string& errorStr, Severity sev=SevError, bool checkCoverage=false, std::string path = std::string(), std::string schema_path = std::string() );
#endif

636
fdbclient/Schemas.cpp Normal file
View File

@ -0,0 +1,636 @@
/*
* Schemas.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Schemas.h"
const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
{
"cluster":{
"layers":{
"_valid":true,
"_error":"some error description"
},
"processes":{
"$map":{
"version":"3.0.0",
"machine_id":"0ccb4e0feddb5583010f6b77d9d10ece",
"locality":{
"$map":"value"
},
"class_source":{
"$enum":[
"command_line",
"configure_auto",
"set_class"
]
},
"class_type":{
"$enum":[
"unset",
"storage",
"transaction",
"resolution",
"proxy",
"master",
"test"
]
},
"roles":[
{
"query_queue_max":0,
"input_bytes":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"stored_bytes":12341234,
"kvstore_used_bytes":12341234,
"kvstore_available_bytes":12341234,
"kvstore_free_bytes":12341234,
"kvstore_total_bytes":12341234,
"durable_bytes":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"queue_disk_used_bytes":12341234,
"queue_disk_available_bytes":12341234,
"queue_disk_free_bytes":12341234,
"queue_disk_total_bytes":12341234,
"role":{
"$enum":[
"master",
"proxy",
"log",
"storage",
"resolver",
"cluster_controller"
]
},
"data_version":12341234,
"data_lag": {
"seconds":5.0,
"versions":12341234
},
"id":"eb84471d68c12d1d26f692a50000003f",
"finished_queries":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
}
],
"command_line":"-r simulation",
"memory":{
"available_bytes":0,
"limit_bytes":0,
"unused_allocated_memory":0,
"used_bytes":0
},
"messages":[
{
"time":12345.12312,
"type":"x",
"name":{
"$enum":[
"file_open_error",
"incorrect_cluster_file_contents",
"process_error",
"io_error",
"io_timeout",
"platform_error",
"storage_server_lagging",
"(other FDB error messages)"
]
},
"raw_log_message":"<stuff/>",
"description":"abc"
}
],
"fault_domain":"0ccb4e0fdbdb5583010f6b77d9d10ece",
"excluded":false,
"address":"1.2.3.4:1234",
"disk":{
"free_bytes":3451233456234,
"reads":{
"hz":0.0,
"counter":0,
"sectors":0
},
"busy":0.0,
"writes":{
"hz":0.0,
"counter":0,
"sectors":0
},
"total_bytes":123412341234
},
"uptime_seconds":1234.2345,
"cpu":{
"usage_cores":0.0
},
"network":{
"current_connections":0,
"connections_established":{
"hz":0.0
},
"connections_closed":{
"hz":0.0
},
"connection_errors":{
"hz":0.0
},
"megabits_sent":{
"hz":0.0
},
"megabits_received":{
"hz":0.0
}
}
}
},
"old_logs":[
{
"logs":[
{
"id":"7f8d623d0cb9966e",
"healthy":true,
"address":"1.2.3.4:1234"
}
],
"log_replication_factor":3,
"log_write_anti_quorum":0,
"log_fault_tolerance":2,
"remote_log_replication_factor":3,
"remote_log_fault_tolerance":2,
"satellite_log_replication_factor":3,
"satellite_log_write_anti_quorum":0,
"satellite_log_fault_tolerance":2
}
],
"fault_tolerance":{
"max_machine_failures_without_losing_availability":0,
"max_machine_failures_without_losing_data":0
},
"qos":{
"worst_queue_bytes_log_server":460,
"performance_limited_by":{
"reason_server_id":"7f8d623d0cb9966e",
"reason_id":0,
"name":{
"$enum":[
"workload",
"storage_server_write_queue_size",
"storage_server_write_bandwidth_mvcc",
"storage_server_readable_behind",
"log_server_mvcc_write_bandwidth",
"log_server_write_queue",
"storage_server_min_free_space",
"storage_server_min_free_space_ratio",
"log_server_min_free_space",
"log_server_min_free_space_ratio"
]
},
"description":"The database is not being saturated by the workload."
},
"transactions_per_second_limit":0,
"released_transactions_per_second":0,
"limiting_queue_bytes_storage_server":0,
"worst_queue_bytes_storage_server":0,
"limiting_version_lag_storage_server":0,
"worst_version_lag_storage_server":0
},
"incompatible_connections":[
],
"datacenter_version_difference":0,
"database_available":true,
"database_locked":false,
"generation":2,
"latency_probe":{
"read_seconds":7,
"immediate_priority_transaction_start_seconds":0.0,
"batch_priority_transaction_start_seconds":0.0,
"transaction_start_seconds":0.0,
"commit_seconds":0.02
},
"clients":{
"count":1,
"supported_versions":[
{
"client_version":"3.0.0",
"connected_clients":[
{
"address":"127.0.0.1:9898",
"log_group":"default"
}
],
"count" : 1,
"protocol_version" : "fdb00a400050001",
"source_version" : "9430e1127b4991cbc5ab2b17f41cfffa5de07e9d"
}
]
},
"messages":[
{
"reasons":[
{
"description":"Blah."
}
],
"unreachable_processes":[
{
"address":"1.2.3.4:1234"
}
],
"name":{
"$enum":[
"unreachable_master_worker",
"unreadable_configuration",
"full_replication_timeout",
"client_issues",
"unreachable_processes",
"immediate_priority_transaction_start_probe_timeout",
"batch_priority_transaction_start_probe_timeout",
"transaction_start_probe_timeout",
"read_probe_timeout",
"commit_probe_timeout",
"storage_servers_error",
"status_incomplete",
"layer_status_incomplete",
"database_availability_timeout"
]
},
"issues":[
{
"name":{
"$enum":[
"incorrect_cluster_file_contents"
]
},
"description":"Cluster file contents do not match current cluster connection string. Verify cluster file is writable and has not been overwritten externally."
}
],
"description":"abc"
}
],
"recovery_state":{
"required_resolvers":1,
"required_proxies":1,
"name":{
"$enum":[
"reading_coordinated_state",
"locking_coordinated_state",
"locking_old_transaction_servers",
"reading_transaction_system_state",
"configuration_missing",
"configuration_never_created",
"configuration_invalid",
"recruiting_transaction_servers",
"initializing_transaction_servers",
"recovery_transaction",
"writing_coordinated_state",
"accepting_commits",
"all_logs_recruited",
"storage_recovered",
"fully_recovered"
]
},
"required_logs":3,
"missing_logs":"7f8d623d0cb9966e",
"description":"Recovery complete."
},
"workload":{
"operations":{
"writes":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"reads":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"bytes":{
"written":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"read":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"keys":{
"read":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"transactions":{
"started":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"conflicted":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"committed":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
}
},
"cluster_controller_timestamp":1415650089,
"protocol_version":"fdb00a400050001",
"connection_string":"a:a@127.0.0.1:4000",
"full_replication":true,
"configuration":{
"log_anti_quorum":0,
"log_replicas":2,
"log_replication_policy":"(zoneid^3x1)",
"redundancy_mode":{
"$enum":[
"single",
"double",
"triple",
"three_datacenter",
"three_datacenter_fallback",
"three_data_hall"
]},
"regions":[{
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
}],
"satellite_redundancy_mode":{
"$enum":[
"one_satellite_single",
"one_satellite_double",
"one_satellite_triple",
"two_satellite_safe",
"two_satellite_fast"
]},
"satellite_log_replicas":1,
"satellite_usable_dcs":1,
"satellite_anti_quorum":0,
"satellite_log_policy":"(zoneid^3x1)",
"satellite_logs":2
}],
"remote_redundancy_mode":{
"$enum":[
"remote_default",
"remote_single",
"remote_double",
"remote_triple",
"remote_three_data_hall"
]},
"remote_log_replicas":3,
"remote_logs":5,
"log_routers":10,
"usable_regions":1,
"repopulate_anti_quorum":1,
"storage_replicas":1,
"resolvers":1,
"storage_replication_policy":"(zoneid^3x1)",
"logs":2,
"storage_engine":{
"$enum":[
"ssd",
"ssd-1",
"ssd-2",
"memory"
]},
"coordinators_count":1,
"excluded_servers":[
{
"address":"10.0.4.1"
}
],
"auto_proxies":3,
"auto_resolvers":1,
"auto_logs":3,
"proxies":5
},
"data":{
"least_operating_space_bytes_log_server":0,
"average_partition_size_bytes":0,
"state":{
"healthy":true,
"min_replicas_remaining":0,
"name":{
"$enum":[
"initializing",
"missing_data",
"healing",
"healthy_repartitioning",
"healthy_removing_server",
"healthy_rebalancing",
"healthy"
]
},
"description":""
},
"least_operating_space_ratio_storage_server":0.1,
"max_machine_failures_without_losing_availability":0,
"total_disk_used_bytes":0,
"total_kv_size_bytes":0,
"partitions_count":2,
"moving_data":{
"total_written_bytes":0,
"in_flight_bytes":0,
"in_queue_bytes":0,
"highest_priority":0
},
"team_trackers":[
{
"primary":true,
"in_flight_bytes":0,
"unhealthy_servers":0,
"state":{
"healthy":true,
"min_replicas_remaining":0,
"name":{
"$enum":[
"initializing",
"missing_data",
"healing",
"healthy_repartitioning",
"healthy_removing_server",
"healthy_rebalancing",
"healthy"
]
},
"description":""
}
}
],
"least_operating_space_bytes_storage_server":0,
"max_machine_failures_without_losing_data":0
},
"machines":{
"$map":{
"network":{
"megabits_sent":{
"hz":0.0
},
"megabits_received":{
"hz":0.0
},
"tcp_segments_retransmitted":{
"hz":0.0
}
},
"memory":{
"free_bytes":0,
"committed_bytes":0,
"total_bytes":0
},
"contributing_workers":4,
"datacenter_id":"6344abf1813eb05b",
"excluded":false,
"address":"1.2.3.4",
"machine_id":"6344abf1813eb05b",
"locality":{
"$map":"value"
},
"cpu":{
"logical_core_utilization":0.4
}
}
}
},
"client":{
"coordinators":{
"coordinators":[
{
"reachable":true,
"address":"127.0.0.1:4701"
}
],
"quorum_reachable":true
},
"database_status":{
"available":true,
"healthy":true
},
"messages":[
{
"name":{
"$enum":[
"inconsistent_cluster_file",
"unreachable_cluster_controller",
"no_cluster_controller",
"status_incomplete_client",
"status_incomplete_coordinators",
"status_incomplete_error",
"status_incomplete_timeout",
"status_incomplete_cluster",
"quorum_not_reachable"
]
},
"description":"The cluster file is not up to date."
}
],
"timestamp":1415650089,
"cluster_file":{
"path":"/etc/foundationdb/fdb.cluster",
"up_to_date":true
}
}
})statusSchema");
const KeyRef JSONSchemas::configurationSchema = LiteralStringRef(R"configSchema(
{
"create":{
"$enum":[
"new"
]},
"log_anti_quorum":0,
"log_replicas":2,
"log_replication_policy":"(zoneid^3x1)",
"redundancy_mode":{
"$enum":[
"single",
"double",
"triple",
"three_datacenter",
"three_datacenter_fallback",
"three_data_hall"
]},
"regions":[{
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
}],
"satellite_redundancy_mode":{
"$enum":[
"one_satellite_single",
"one_satellite_double",
"one_satellite_triple",
"two_satellite_safe",
"two_satellite_fast"
]},
"satellite_log_replicas":1,
"satellite_usable_dcs":1,
"satellite_anti_quorum":0,
"satellite_log_policy":"(zoneid^3x1)",
"satellite_logs":2
}],
"remote_redundancy_mode":{
"$enum":[
"remote_default",
"remote_single",
"remote_double",
"remote_triple",
"remote_three_data_hall"
]},
"remote_log_replicas":3,
"remote_logs":5,
"log_routers":10,
"usable_regions":1,
"repopulate_anti_quorum":1,
"storage_replicas":1,
"resolvers":1,
"storage_replication_policy":"(zoneid^3x1)",
"logs":2,
"storage_engine":{
"$enum":[
"ssd",
"ssd-1",
"ssd-2",
"memory"
]},
"auto_proxies":3,
"auto_resolvers":1,
"auto_logs":3,
"proxies":5
})configSchema");

34
fdbclient/Schemas.h Normal file
View File

@ -0,0 +1,34 @@
/*
* Schemas.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_SCHEMAS_H
#define FDBCLIENT_SCHEMAS_H
#pragma once
#include "flow/flow.h"
#include "FDBTypes.h"
struct JSONSchemas {
static const KeyRef statusSchema;
static const KeyRef configurationSchema;
};
#endif /* FDBCLIENT_SCHEMAS_H */

View File

@ -65,6 +65,7 @@
<ClInclude Include="ReadYourWrites.h" />
<ActorCompiler Include="RunTransaction.actor.h" />
<ClInclude Include="RYWIterator.h" />
<ClInclude Include="Schemas.h" />
<ClInclude Include="SnapshotCache.h" />
<ClInclude Include="Status.h" />
<ClInclude Include="StatusClient.h" />
@ -97,6 +98,7 @@
<ActorCompiler Include="MultiVersionTransaction.actor.cpp" />
<ClCompile Include="RYWIterator.cpp" />
<ActorCompiler Include="StatusClient.actor.cpp" />
<ClCompile Include="Schemas.cpp" />
<ClCompile Include="SystemData.cpp" />
<ActorCompiler Include="ThreadSafeTransaction.actor.cpp" />
<ActorCompiler Include="TaskBucket.actor.cpp" />

View File

@ -24,6 +24,8 @@
#include "workloads.h"
#include "fdbclient/StatusClient.h"
#include "flow/UnitTest.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/ManagementAPI.h"
extern bool noUnseed;
@ -31,7 +33,7 @@ struct StatusWorkload : TestWorkload {
double testDuration, requestsPerSecond;
PerfIntCounter requests, replies, errors, totalSize;
Optional<StatusObject> statusSchema;
Optional<StatusObject> parsedSchema;
StatusWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx),
@ -39,14 +41,14 @@ struct StatusWorkload : TestWorkload {
{
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
requestsPerSecond = getOption(options, LiteralStringRef("requestsPerSecond"), 0.5);
auto statusSchemaStr = getOption(options, LiteralStringRef("schema"), StringRef());
auto statusSchemaStr = getOption(options, LiteralStringRef("schema"), JSONSchemas::statusSchema);
if (statusSchemaStr.size()) {
json_spirit::mValue schema;
json_spirit::read_string( statusSchemaStr.toString(), schema );
statusSchema = schema.get_obj();
parsedSchema = schema.get_obj();
// This is sort of a hack, but generate code coverage *requirements* for everything in schema
schemaCoverageRequirements(statusSchema.get());
schemaCoverageRequirements(parsedSchema.get());
}
noUnseed = true;
@ -80,30 +82,19 @@ struct StatusWorkload : TestWorkload {
m.push_back(errors.getMetric());
}
template <bool Covered=true>
static void schemaCoverage( std::string const& spath ) {
static std::set<std::string> coveredSchemaPaths;
if (coveredSchemaPaths.insert(spath).second) {
TraceEvent ev(SevInfo, "CodeCoverage");
ev.detail("File", "documentation/StatusSchema.json/" + spath).detail("Line", 0);
if (!Covered)
ev.detail("Covered", 0);
}
}
static void schemaCoverageRequirements( StatusObject const& schema, std::string schema_path = std::string() ) {
try {
for(auto& skv : schema) {
std::string spath = schema_path + "." + skv.first;
schemaCoverage<false>(spath);
schemaCoverage(spath, false);
if (skv.second.type() == json_spirit::array_type && skv.second.get_array().size()) {
schemaCoverageRequirements( skv.second.get_array()[0].get_obj(), spath + "[0]" );
} else if (skv.second.type() == json_spirit::obj_type) {
if (skv.second.get_obj().count("$enum")) {
for(auto& enum_item : skv.second.get_obj().at("$enum").get_array())
schemaCoverage<false>(spath + ".$enum." + enum_item.get_str());
schemaCoverage(spath + ".$enum." + enum_item.get_str(), false);
} else
schemaCoverageRequirements( skv.second.get_obj(), spath );
}
@ -117,125 +108,6 @@ struct StatusWorkload : TestWorkload {
}
}
static json_spirit::Value_type normJSONType(json_spirit::Value_type type) {
if (type == json_spirit::int_type)
return json_spirit::real_type;
return type;
}
static bool schemaMatch( StatusObject const schema, StatusObject const result, Severity sev=SevError, std::string path = std::string(), std::string schema_path = std::string() ) {
// Returns true if everything in `result` is permitted by `schema`
// Really this should recurse on "values" rather than "objects"?
bool ok = true;
try {
for(auto& rkv : result) {
auto& key = rkv.first;
auto& rv = rkv.second;
std::string kpath = path + "." + key;
std::string spath = schema_path + "." + key;
schemaCoverage(spath);
if (!schema.count(key)) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaPath", spath);
ok = false;
continue;
}
auto& sv = schema.at(key);
if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$enum")) {
auto& enum_values = sv.get_obj().at("$enum").get_array();
bool any_match = false;
for(auto& enum_item : enum_values)
if (enum_item == rv) {
any_match = true;
schemaCoverage(spath + ".$enum." + enum_item.get_str());
break;
}
if (!any_match) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaEnumItems", enum_values.size()).detail("Value", json_spirit::write_string(rv));
schemaCoverage(spath + ".$enum." + json_spirit::write_string(rv));
ok = false;
}
} else if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$map")) {
if (rv.type() != json_spirit::obj_type) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
ok = false;
continue;
}
if(sv.get_obj().at("$map").type() != json_spirit::obj_type) {
continue;
}
auto& schema_obj = sv.get_obj().at("$map").get_obj();
auto& value_obj = rv.get_obj();
schemaCoverage(spath + ".$map");
for(auto& value_pair : value_obj) {
auto vpath = kpath + "[" + value_pair.first + "]";
auto upath = spath + ".$map";
if (value_pair.second.type() != json_spirit::obj_type) {
TraceEvent(sev, "SchemaMismatch").detail("Path", vpath).detail("ValueType", value_pair.second.type());
ok = false;
continue;
}
if (!schemaMatch(schema_obj, value_pair.second.get_obj(), sev, vpath, upath))
ok = false;
}
} else {
// The schema entry isn't an operator, so it asserts a type and (depending on the type) recursive schema definition
if (normJSONType(sv.type()) != normJSONType(rv.type())) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
ok = false;
continue;
}
if (rv.type() == json_spirit::array_type) {
auto& value_array = rv.get_array();
auto& schema_array = sv.get_array();
if (!schema_array.size()) {
// An empty schema array means that the value array is required to be empty
if (value_array.size()) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaSize", schema_array.size()).detail("ValueSize", value_array.size());
ok = false;
continue;
}
} else if (schema_array.size() == 1 && schema_array[0].type() == json_spirit::obj_type) {
// A one item schema array means that all items in the value must match the first item in the schema
auto& schema_obj = schema_array[0].get_obj();
int index = 0;
for(auto &value_item : value_array) {
if (value_item.type() != json_spirit::obj_type) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath + format("[%d]",index)).detail("ValueType", value_item.type());
ok = false;
continue;
}
if (!schemaMatch(schema_obj, value_item.get_obj(), sev, kpath + format("[%d]", index), spath + "[0]"))
ok = false;
index++;
}
} else
ASSERT(false); // Schema doesn't make sense
} else if (rv.type() == json_spirit::obj_type) {
auto& schema_obj = sv.get_obj();
auto& value_obj = rv.get_obj();
if (!schemaMatch(schema_obj, value_obj, sev, kpath, spath))
ok = false;
}
}
}
return ok;
} catch (std::exception& e) {
TraceEvent(SevError, "SchemaMatchException").detail("What", e.what()).detail("Path", path).detail("SchemaPath", schema_path);
throw unknown_error();
}
}
ACTOR Future<Void> fetcher(Reference<ClusterConnectionFile> connFile, StatusWorkload *self) {
state double lastTime = now();
@ -251,8 +123,8 @@ struct StatusWorkload : TestWorkload {
save(br, result);
self->totalSize += br.getLength();
TraceEvent("StatusWorkloadReply").detail("ReplySize", br.getLength()).detail("Latency", now() - issued);//.detail("Reply", json_spirit::write_string(json_spirit::mValue(result)));
if (self->statusSchema.present() && !schemaMatch(self->statusSchema.get(), result) )
std::string errorStr;
if (self->parsedSchema.present() && !schemaMatch(self->parsedSchema.get(), result, errorStr, SevError, true) )
TraceEvent(SevError, "StatusWorkloadValidationFailed").detail("JSON", json_spirit::write_string(json_spirit::mValue(result)));
}
catch (Error& e) {
@ -277,7 +149,8 @@ TEST_CASE("fdbserver/status/schema/basic") {
json_spirit::mValue test;
json_spirit::read_string( t, test );
TraceEvent("SchemaMatch").detail("Schema", json_spirit::write_string(schema)).detail("Value", json_spirit::write_string(test)).detail("Expect", expect_ok);
ASSERT( expect_ok == StatusWorkload::schemaMatch(schema.get_obj(), test.get_obj(), expect_ok ? SevError : SevInfo) );
std::string errorStr;
ASSERT( expect_ok == schemaMatch(schema.get_obj(), test.get_obj(), errorStr, expect_ok ? SevError : SevInfo, true) );
};
check(true, "{}");
check(true, "{\"apple\":4}");
@ -293,4 +166,4 @@ TEST_CASE("fdbserver/status/schema/basic") {
check(false, "{\"mapped\":{\"item1\":{\"x\":false},\"item2\":{\"y\":1}}}");
return Void();
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long