Merge branch 'release-6.0' of github.com:apple/foundationdb into status-json

# Conflicts:
#	fdbclient/fdbclient.vcxproj
#	fdbserver/Status.actor.cpp
This commit is contained in:
Alvin Moore 2018-09-05 14:44:09 -07:00
commit 221f73e69e
41 changed files with 1414 additions and 355 deletions

View File

@ -378,7 +378,7 @@ class DirectoryTest(Test):
def get_result_specifications(self):
return [
ResultSpecification(self.stack_subspace, key_start_index=1, ordering_index=1, global_error_filter=[1021]),
ResultSpecification(self.stack_subspace, key_start_index=1, ordering_index=1, global_error_filter=[1007, 1021]),
ResultSpecification(self.directory_log, ordering_index=0),
ResultSpecification(self.subspace_log, ordering_index=0)
]

View File

@ -30,8 +30,6 @@ import (
"log"
"os"
"strings"
"unicode"
"unicode/utf8"
)
type Option struct {
@ -114,23 +112,14 @@ func translateName(old string) string {
return strings.Replace(strings.Title(strings.Replace(old, "_", " ", -1)), " ", "", -1)
}
func lowerFirst(s string) string {
if s == "" {
return ""
}
r, n := utf8.DecodeRuneInString(s)
return string(unicode.ToLower(r)) + s[n:]
}
func writeMutation(opt Option) {
desc := lowerFirst(opt.Description)
tname := translateName(opt.Name)
fmt.Printf(`
// %s %s
// %s
func (t Transaction) %s(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, %d)
}
`, tname, desc, tname, opt.Code)
`, opt.Description, tname, opt.Code)
}
func writeEnum(scope Scope, opt Option, delta int) {
@ -207,7 +196,7 @@ func int64ToBytes(i int64) ([]byte, error) {
receiver := scope.Name + "s"
for _, opt := range scope.Option {
if opt.Description != "Deprecated" && !opt.Hidden { // Eww
if !opt.Hidden {
writeOpt(receiver, opt)
}
}
@ -216,7 +205,7 @@ func int64ToBytes(i int64) ([]byte, error) {
if scope.Name == "MutationType" {
for _, opt := range scope.Option {
if opt.Description != "Deprecated" && !opt.Hidden { // Eww
if !opt.Hidden {
writeMutation(opt)
}
}

View File

@ -42,6 +42,20 @@ func int64ToBytes(i int64) ([]byte, error) {
return buf.Bytes(), nil
}
// Deprecated
//
// Parameter: IP:PORT
func (o NetworkOptions) SetLocalAddress(param string) error {
return o.setOpt(10, []byte(param))
}
// Deprecated
//
// Parameter: path to cluster file
func (o NetworkOptions) SetClusterFile(param string) error {
return o.setOpt(20, []byte(param))
}
// Enables trace output to a file in a directory of the clients choosing
//
// Parameter: path to output directory (or NULL for current working directory)
@ -85,7 +99,7 @@ func (o NetworkOptions) SetKnob(param string) error {
return o.setOpt(40, []byte(param))
}
// Set the TLS plugin to load. This option, if used, must be set before any other TLS options
// Deprecated
//
// Parameter: file path or linker-resolved name
func (o NetworkOptions) SetTLSPlugin(param string) error {
@ -280,6 +294,11 @@ func (o TransactionOptions) SetReadYourWritesDisable() error {
return o.setOpt(51, nil)
}
// Deprecated
func (o TransactionOptions) SetReadAheadDisable() error {
return o.setOpt(52, nil)
}
// Not yet implemented.
func (o TransactionOptions) SetDurabilityDatacenter() error {
return o.setOpt(110, nil)
@ -290,6 +309,11 @@ func (o TransactionOptions) SetDurabilityRisky() error {
return o.setOpt(120, nil)
}
// Deprecated
func (o TransactionOptions) SetDurabilityDevNullIsWebScale() error {
return o.setOpt(130, nil)
}
// Specifies that this transaction should be treated as highest priority and that lower priority transactions should block behind this one. Use is discouraged outside of low-level tools
func (o TransactionOptions) SetPrioritySystemImmediate() error {
return o.setOpt(200, nil)
@ -431,57 +455,72 @@ const (
StreamingModeSerial StreamingMode = 5
)
// Add performs an addition of little-endian integers. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The integers to be added must be stored in a little-endian representation. They can be signed in two's complement representation or unsigned. You can add to an integer at a known offset in the value by prepending the appropriate number of zero bytes to ``param`` and padding with zero bytes to match the length of the value. However, this offset technique requires that you know the addition will not cause the integer field within the value to overflow.
// Performs an addition of little-endian integers. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The integers to be added must be stored in a little-endian representation. They can be signed in two's complement representation or unsigned. You can add to an integer at a known offset in the value by prepending the appropriate number of zero bytes to ``param`` and padding with zero bytes to match the length of the value. However, this offset technique requires that you know the addition will not cause the integer field within the value to overflow.
func (t Transaction) Add(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 2)
}
// BitAnd performs a bitwise ``and`` operation. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``.
// Deprecated
func (t Transaction) And(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 6)
}
// Performs a bitwise ``and`` operation. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``.
func (t Transaction) BitAnd(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 6)
}
// BitOr performs a bitwise ``or`` operation. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``.
// Deprecated
func (t Transaction) Or(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 7)
}
// Performs a bitwise ``or`` operation. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``.
func (t Transaction) BitOr(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 7)
}
// BitXor performs a bitwise ``xor`` operation. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``.
// Deprecated
func (t Transaction) Xor(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 8)
}
// Performs a bitwise ``xor`` operation. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``.
func (t Transaction) BitXor(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 8)
}
// AppendIfFits appends ``param`` to the end of the existing value already in the database at the given key (or creates the key and sets the value to ``param`` if the key is empty). This will only append the value if the final concatenated value size is less than or equal to the maximum value size (i.e., if it fits). WARNING: No error is surfaced back to the user if the final value is too large because the mutation will not be applied until after the transaction has been committed. Therefore, it is only safe to use this mutation type if one can guarantee that one will keep the total value size under the maximum size.
// Appends ``param`` to the end of the existing value already in the database at the given key (or creates the key and sets the value to ``param`` if the key is empty). This will only append the value if the final concatenated value size is less than or equal to the maximum value size (i.e., if it fits). WARNING: No error is surfaced back to the user if the final value is too large because the mutation will not be applied until after the transaction has been committed. Therefore, it is only safe to use this mutation type if one can guarantee that one will keep the total value size under the maximum size.
func (t Transaction) AppendIfFits(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 9)
}
// Max performs a little-endian comparison of byte strings. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The larger of the two values is then stored in the database.
// Performs a little-endian comparison of byte strings. If the existing value in the database is not present or shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The larger of the two values is then stored in the database.
func (t Transaction) Max(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 12)
}
// Min performs a little-endian comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The smaller of the two values is then stored in the database.
// Performs a little-endian comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored in the database. If the existing value in the database is shorter than ``param``, it is first extended to the length of ``param`` with zero bytes. If ``param`` is shorter than the existing value in the database, the existing value is truncated to match the length of ``param``. The smaller of the two values is then stored in the database.
func (t Transaction) Min(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 13)
}
// SetVersionstampedKey transforms ``key`` using a versionstamp for the transaction. Sets the transformed key in the database to ``param``. The key is transformed by removing the final four bytes from the key and reading those as a little-Endian 32-bit integer to get a position ``pos``. The 10 bytes of the key from ``pos`` to ``pos + 10`` are replaced with the versionstamp of the transaction used. The first byte of the key is position 0. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database (serialized in big-Endian order). The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time, versionstamps are compatible with the Tuple layer only in the Java and Python bindings. Also, note that prior to API version 520, the offset was computed from only the final two bytes rather than the final four bytes.
// Transforms ``key`` using a versionstamp for the transaction. Sets the transformed key in the database to ``param``. The key is transformed by removing the final four bytes from the key and reading those as a little-Endian 32-bit integer to get a position ``pos``. The 10 bytes of the key from ``pos`` to ``pos + 10`` are replaced with the versionstamp of the transaction used. The first byte of the key is position 0. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database (serialized in big-Endian order). The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time, versionstamps are compatible with the Tuple layer only in the Java and Python bindings. Also, note that prior to API version 520, the offset was computed from only the final two bytes rather than the final four bytes.
func (t Transaction) SetVersionstampedKey(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 14)
}
// SetVersionstampedValue transforms ``param`` using a versionstamp for the transaction. Sets the ``key`` given to the transformed ``param``. The parameter is transformed by removing the final four bytes from ``param`` and reading those as a little-Endian 32-bit integer to get a position ``pos``. The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the versionstamp of the transaction used. The first byte of the parameter is position 0. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database (serialized in big-Endian order). The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time, versionstamps are compatible with the Tuple layer only in the Java and Python bindings. Also, note that prior to API version 520, the versionstamp was always placed at the beginning of the parameter rather than computing an offset.
// Transforms ``param`` using a versionstamp for the transaction. Sets the ``key`` given to the transformed ``param``. The parameter is transformed by removing the final four bytes from ``param`` and reading those as a little-Endian 32-bit integer to get a position ``pos``. The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the versionstamp of the transaction used. The first byte of the parameter is position 0. A versionstamp is a 10 byte, unique, monotonically (but not sequentially) increasing value for each committed transaction. The first 8 bytes are the committed version of the database (serialized in big-Endian order). The last 2 bytes are monotonic in the serialization order for transactions. WARNING: At this time, versionstamps are compatible with the Tuple layer only in the Java and Python bindings. Also, note that prior to API version 520, the versionstamp was always placed at the beginning of the parameter rather than computing an offset.
func (t Transaction) SetVersionstampedValue(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 15)
}
// ByteMin performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the smaller of the two values is then stored in the database.
// Performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the smaller of the two values is then stored in the database.
func (t Transaction) ByteMin(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 16)
}
// ByteMax performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the larger of the two values is then stored in the database.
// Performs lexicographic comparison of byte strings. If the existing value in the database is not present, then ``param`` is stored. Otherwise the larger of the two values is then stored in the database.
func (t Transaction) ByteMax(key KeyConvertible, param []byte) {
t.atomicOp(key.FDBKey(), param, 17)
}

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.0.8.pkg <https://www.foundationdb.org/downloads/6.0.8/macOS/installers/FoundationDB-6.0.8.pkg>`_
* `FoundationDB-6.0.9.pkg <https://www.foundationdb.org/downloads/6.0.9/macOS/installers/FoundationDB-6.0.9.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.0.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.0.8/ubuntu/installers/foundationdb-clients_6.0.8-1_amd64.deb>`_
* `foundationdb-server-6.0.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.0.8/ubuntu/installers/foundationdb-server_6.0.8-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.0.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.0.9/ubuntu/installers/foundationdb-clients_6.0.9-1_amd64.deb>`_
* `foundationdb-server-6.0.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.0.9/ubuntu/installers/foundationdb-server_6.0.9-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.0.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.8/rhel6/installers/foundationdb-clients-6.0.8-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.0.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.8/rhel6/installers/foundationdb-server-6.0.8-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.0.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.9/rhel6/installers/foundationdb-clients-6.0.9-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.0.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.9/rhel6/installers/foundationdb-server-6.0.9-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.0.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.8/rhel7/installers/foundationdb-clients-6.0.8-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.0.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.8/rhel7/installers/foundationdb-server-6.0.8-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.0.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.9/rhel7/installers/foundationdb-clients-6.0.9-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.0.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.9/rhel7/installers/foundationdb-server-6.0.9-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.0.8-x64.msi <https://www.foundationdb.org/downloads/6.0.8/windows/installers/foundationdb-6.0.8-x64.msi>`_
* `foundationdb-6.0.9-x64.msi <https://www.foundationdb.org/downloads/6.0.9/windows/installers/foundationdb-6.0.9-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.0.8.tar.gz <https://www.foundationdb.org/downloads/6.0.8/bindings/python/foundationdb-6.0.8.tar.gz>`_
* `foundationdb-6.0.9.tar.gz <https://www.foundationdb.org/downloads/6.0.9/bindings/python/foundationdb-6.0.9.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.0.8.gem <https://www.foundationdb.org/downloads/6.0.8/bindings/ruby/fdb-6.0.8.gem>`_
* `fdb-6.0.9.gem <https://www.foundationdb.org/downloads/6.0.9/bindings/ruby/fdb-6.0.9.gem>`_
Java 8+
-------
* `fdb-java-6.0.8.jar <https://www.foundationdb.org/downloads/6.0.8/bindings/java/fdb-java-6.0.8.jar>`_
* `fdb-java-6.0.8-javadoc.jar <https://www.foundationdb.org/downloads/6.0.8/bindings/java/fdb-java-6.0.8-javadoc.jar>`_
* `fdb-java-6.0.9.jar <https://www.foundationdb.org/downloads/6.0.9/bindings/java/fdb-java-6.0.9.jar>`_
* `fdb-java-6.0.9-javadoc.jar <https://www.foundationdb.org/downloads/6.0.9/bindings/java/fdb-java-6.0.9-javadoc.jar>`_
Go 1.1+
-------

View File

@ -2,7 +2,7 @@
Release Notes
#############
6.0.8
6.0.9
=====
Features
@ -48,6 +48,9 @@ Fixes
* Sometimes a minority of coordinators would fail to converge after a new leader was elected. [6.0.6] `(PR #700) <https://github.com/apple/foundationdb/pull/700>`_
* Calling status too many times in a 5 second interval caused the cluster controller to pause for a few seconds. [6.0.7] `(PR #711) <https://github.com/apple/foundationdb/pull/711>`_
* Configuring from usable_regions=2 to usable_regions=1 on a cluster with a large number of processes would prevent data distribution from completing. [6.0.8] `(PR #721) <https://github.com/apple/foundationdb/pull/721>`_
* TLS certificate reloading could cause TLS connections to drop until process restart. [6.0.9] `(PR #717) <https://github.com/apple/foundationdb/pull/717>`_
* Watches polled the server much more frequently than intended. [6.0.10] `(PR #728) <https://github.com/apple/foundationdb/pull/728>`_
* Backup and DR didn't allow setting certain knobs. [6.0.10] `(Issue #715) <https://github.com/apple/foundationdb/issues/715>`_
Status
------
@ -63,6 +66,7 @@ Bindings
* API version updated to 600. There are no changes since API version 520.
* Several cases where functions in go might previously cause a panic now return a non-``nil`` error. `(PR #532) <https://github.com/apple/foundationdb/pull/532>`_
* C API calls made on the network thread could be reordered with calls made from other threads. [6.0.2] `(Issue #518) <https://github.com/apple/foundationdb/issues/518>`_
* The TLS_PLUGIN option is now a no-op and has been deprecated. [6.0.10] `(PR #710) <https://github.com/apple/foundationdb/pull/710>`_
Other Changes
-------------

View File

@ -2620,13 +2620,19 @@ int main(int argc, char* argv[]) {
commandLine += argv[a];
}
delete FLOW_KNOBS;
FlowKnobs* flowKnobs = new FlowKnobs(true);
FLOW_KNOBS = flowKnobs;
delete CLIENT_KNOBS;
ClientKnobs* clientKnobs = new ClientKnobs(true);
CLIENT_KNOBS = clientKnobs;
for(auto k=knobs.begin(); k!=knobs.end(); ++k) {
try {
if (!clientKnobs->setKnob( k->first, k->second )) {
if (!flowKnobs->setKnob( k->first, k->second ) &&
!clientKnobs->setKnob( k->first, k->second ))
{
fprintf(stderr, "Unrecognized knob option '%s'\n", k->first.c_str());
return FDB_EXIT_ERROR;
}

View File

@ -27,6 +27,7 @@
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/ManagementAPI.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/FDBOptions.g.h"
@ -443,8 +444,12 @@ void initHelp() {
"All keys between BEGINKEY (inclusive) and ENDKEY (exclusive) are cleared from the database. This command will succeed even if the specified range is empty, but may fail because of conflicts." ESCAPINGK);
helpMap["configure"] = CommandHelp(
"configure [new] <single|double|triple|three_data_hall|three_datacenter|ssd|memory|proxies=<PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*",
"change database configuration",
"change the database configuration",
"The `new' option, if present, initializes a new database with the given configuration rather than changing the configuration of an existing one. When used, both a redundancy mode and a storage engine must be specified.\n\nRedundancy mode:\n single - one copy of the data. Not fault tolerant.\n double - two copies of data (survive one failure).\n triple - three copies of data (survive two failures).\n three_data_hall - See the Admin Guide.\n three_datacenter - See the Admin Guide.\n\nStorage engine:\n ssd - B-Tree storage engine optimized for solid state disks.\n memory - Durable in-memory storage engine for small datasets.\n\nproxies=<PROXIES>: Sets the desired number of proxies in the cluster. Must be at least 1, or set to -1 which restores the number of proxies to the default value.\n\nlogs=<LOGS>: Sets the desired number of log servers in the cluster. Must be at least 1, or set to -1 which restores the number of logs to the default value.\n\nresolvers=<RESOLVERS>: Sets the desired number of resolvers in the cluster. Must be at least 1, or set to -1 which restores the number of resolvers to the default value.\n\nSee the FoundationDB Administration Guide for more information.");
helpMap["fileconfigure"] = CommandHelp(
"fileconfigure [new] <FILENAME>",
"change the database configuration from a file",
"The `new' option, if present, initializes a new database with the given configuration rather than changing the configuration of an existing one. Load a JSON document from the provided file, and change the database configuration to match the contents of the JSON document. The format should be the same as the value of the \"configuration\" entry in status JSON without \"excluded_servers\" or \"coordinators_count\".");
helpMap["coordinators"] = CommandHelp(
"coordinators auto|<ADDRESS>+ [description=new_cluster_description]",
"change cluster coordinators or description",
@ -1573,9 +1578,96 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
case ConfigurationResult::UNKNOWN_OPTION:
case ConfigurationResult::INCOMPLETE_CONFIGURATION:
printUsage(tokens[0]);
ret = true;
ret=true;
break;
case ConfigurationResult::INVALID_CONFIGURATION:
printf("ERROR: These changes would make the configuration invalid\n");
ret=true;
break;
case ConfigurationResult::DATABASE_ALREADY_CREATED:
printf("ERROR: Database already exists! To change configuration, don't say `new'\n");
ret=true;
break;
case ConfigurationResult::DATABASE_CREATED:
printf("Database created\n");
ret=false;
break;
case ConfigurationResult::SUCCESS:
printf("Configuration changed\n");
ret=false;
break;
default:
ASSERT(false);
ret=true;
};
return ret;
}
ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDatabase) {
std::string contents(readFileBytes(filePath, 100000));
json_spirit::mValue config;
if(!json_spirit::read_string( contents, config )) {
printf("ERROR: Invalid JSON\n");
return true;
}
StatusObject configJSON = config.get_obj();
json_spirit::mValue schema;
if(!json_spirit::read_string( JSONSchemas::configurationSchema.toString(), schema )) {
ASSERT(false);
}
std::string errorStr;
if( !schemaMatch(schema.get_obj(), configJSON, errorStr) ) {
printf("%s", errorStr.c_str());
return true;
}
std::string configString;
if(isNewDatabase) {
configString = "new";
}
for(auto kv : configJSON) {
if(!configString.empty()) {
configString += " ";
}
if( kv.second.type() == json_spirit::int_type ) {
configString += kv.first + ":=" + format("%d", kv.second.get_int());
} else if( kv.second.type() == json_spirit::str_type ) {
configString += kv.second.get_str();
} else if( kv.second.type() == json_spirit::array_type ) {
configString += kv.first + "=" + json_spirit::write_string(json_spirit::mValue(kv.second.get_array()), json_spirit::Output_options::none);
} else {
printUsage(LiteralStringRef("fileconfigure"));
return true;
}
}
ConfigurationResult::Type result = wait( makeInterruptable( changeConfig(db, configString) ) );
// Real errors get thrown from makeInterruptable and printed by the catch block in cli(), but
// there are various results specific to changeConfig() that we need to report:
bool ret;
switch(result) {
case ConfigurationResult::NO_OPTIONS_PROVIDED:
printf("ERROR: No options provided\n");
ret=true;
break;
case ConfigurationResult::CONFLICTING_OPTIONS:
printf("ERROR: Conflicting options\n");
ret=true;
break;
case ConfigurationResult::UNKNOWN_OPTION:
printf("ERROR: Unknown option\n"); //This should not be possible because of schema match
ret=true;
break;
case ConfigurationResult::INCOMPLETE_CONFIGURATION:
printf("ERROR: Must specify both a replication level and a storage engine when creating a new database\n");
ret=true;
break;
case ConfigurationResult::INVALID_CONFIGURATION:
printf("ERROR: These changes would make the configuration invalid\n");
ret=true;
break;
case ConfigurationResult::DATABASE_ALREADY_CREATED:
printf("ERROR: Database already exists! To change configuration, don't say `new'\n");
ret=true;
@ -2457,6 +2549,17 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "fileconfigure")) {
if (tokens.size() == 2 || (tokens.size() == 3 && tokens[1] == LiteralStringRef("new"))) {
bool err = wait( fileConfigure( db, tokens.back().toString(), tokens.size() == 3 ) );
if (err) is_error = true;
} else {
printUsage(tokens[0]);
is_error = true;
}
continue;
}
if (tokencmp(tokens[0], "coordinators")) {
auto cs = ClusterConnectionFile( ccf->getFilename() ).getConnectionString();
if (tokens.size() < 2) {

View File

@ -177,7 +177,8 @@ bool DatabaseConfiguration::isValid() const {
usableRegions <= 2 &&
regions.size() <= 2 &&
( usableRegions == 1 || regions.size() == 2 ) &&
( regions.size() == 0 || regions[0].priority >= 0 ) ) ) {
( regions.size() == 0 || regions[0].priority >= 0 ) &&
( regions.size() == 0 || tLogPolicy->info() != "dcid^2 x zoneid^2 x 1") ) ) { //We cannot specify regions with three_datacenter replication
return false;
}

View File

@ -39,6 +39,8 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( FAILURE_MIN_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_MIN_DELAY = 1.0;
init( FAILURE_TIMEOUT_DELAY, FAILURE_MIN_DELAY );
init( CLIENT_FAILURE_TIMEOUT_DELAY, FAILURE_MIN_DELAY );
init( FAILURE_EMERGENCY_DELAY, 60.0 );
init( FAILURE_MAX_GENERATIONS, 4 );
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
@ -77,7 +79,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( WATCH_POLLING_TIME, 1.0 ); if( randomize && BUGGIFY ) WATCH_POLLING_TIME = 5.0;
init( NO_RECENT_UPDATES_DURATION, 20.0 ); if( randomize && BUGGIFY ) NO_RECENT_UPDATES_DURATION = 0.1;
init( FAST_WATCH_TIMEOUT, 20.0 ); if( randomize && BUGGIFY ) FAST_WATCH_TIMEOUT = 1.0;
init( WATCH_TIMEOUT, 900.0 ); if( randomize ) WATCH_TIMEOUT = 20.0;
init( WATCH_TIMEOUT, 900.0 ); if( randomize && BUGGIFY ) WATCH_TIMEOUT = 20.0;
// Core
init( CORE_VERSIONSPERSECOND, 1e6 );

View File

@ -38,6 +38,8 @@ public:
double FAILURE_MIN_DELAY;
double FAILURE_TIMEOUT_DELAY;
double CLIENT_FAILURE_TIMEOUT_DELAY;
double FAILURE_EMERGENCY_DELAY;
double FAILURE_MAX_GENERATIONS;
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)

View File

@ -250,6 +250,23 @@ bool isCompleteConfiguration( std::map<std::string, std::string> const& options
options.count( p+"storage_engine" ) == 1;
}
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration( Database cx ) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Standalone<RangeResultRef> res = wait( tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY) );
ASSERT( res.size() < CLIENT_KNOBS->TOO_MANY );
DatabaseConfiguration config;
config.fromKeyValues((VectorRef<KeyValueRef>) res);
return config;
} catch( Error &e ) {
Void _ = wait( tr.onError(e) );
}
}
}
ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std::string, std::string> m ) {
state StringRef initIdKey = LiteralStringRef( "\xff/init_id" );
state Transaction tr(cx);
@ -264,6 +281,19 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std:
m[initIdKey.toString()] = g_random->randomUniqueID().toString();
if (!isCompleteConfiguration(m))
return ConfigurationResult::INCOMPLETE_CONFIGURATION;
} else {
state Future<DatabaseConfiguration> fConfig = getDatabaseConfiguration(cx);
Void _ = wait( success(fConfig) || delay(1.0) );
if(fConfig.isReady()) {
DatabaseConfiguration config = fConfig.get();
for(auto kv : m) {
config.set(kv.first, kv.second);
}
if(!config.isValid()) {
return ConfigurationResult::INVALID_CONFIGURATION;
}
}
}
loop {
@ -1172,23 +1202,6 @@ ACTOR Future<Void> waitForExcludedServers( Database cx, vector<AddressExclusion>
}
}
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration( Database cx ) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<RangeResultRef> res = wait( tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY) );
ASSERT( res.size() < CLIENT_KNOBS->TOO_MANY );
DatabaseConfiguration config;
config.fromKeyValues((VectorRef<KeyValueRef>) res);
return config;
} catch( Error &e ) {
Void _ = wait( tr.onError(e) );
}
}
}
ACTOR Future<Void> waitForFullReplication( Database cx ) {
state ReadYourWritesTransaction tr(cx);
loop {
@ -1385,6 +1398,140 @@ ACTOR Future<Void> forceRecovery (Reference<ClusterConnectionFile> clusterFile)
}
}
json_spirit::Value_type normJSONType(json_spirit::Value_type type) {
if (type == json_spirit::int_type)
return json_spirit::real_type;
return type;
}
void schemaCoverage( std::string const& spath, bool covered ) {
static std::set<std::string> coveredSchemaPaths;
if (coveredSchemaPaths.insert(spath).second) {
TraceEvent ev(SevInfo, "CodeCoverage");
ev.detail("File", "documentation/StatusSchema.json/" + spath).detail("Line", 0);
if (!covered)
ev.detail("Covered", 0);
}
}
bool schemaMatch( StatusObject const schema, StatusObject const result, std::string& errorStr, Severity sev, bool checkCoverage, std::string path, std::string schema_path ) {
// Returns true if everything in `result` is permitted by `schema`
// Really this should recurse on "values" rather than "objects"?
bool ok = true;
try {
for(auto& rkv : result) {
auto& key = rkv.first;
auto& rv = rkv.second;
std::string kpath = path + "." + key;
std::string spath = schema_path + "." + key;
if(checkCoverage) schemaCoverage(spath);
if (!schema.count(key)) {
errorStr += format("ERROR: Unknown key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaPath", spath);
ok = false;
continue;
}
auto& sv = schema.at(key);
if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$enum")) {
auto& enum_values = sv.get_obj().at("$enum").get_array();
bool any_match = false;
for(auto& enum_item : enum_values)
if (enum_item == rv) {
any_match = true;
if(checkCoverage) schemaCoverage(spath + ".$enum." + enum_item.get_str());
break;
}
if (!any_match) {
errorStr += format("ERROR: Unknown value `%s' for key `%s'\n", json_spirit::write_string(rv).c_str(), kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaEnumItems", enum_values.size()).detail("Value", json_spirit::write_string(rv));
if(checkCoverage) schemaCoverage(spath + ".$enum." + json_spirit::write_string(rv));
ok = false;
}
} else if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$map")) {
if (rv.type() != json_spirit::obj_type) {
errorStr += format("ERROR: Expected an object as the value for key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
ok = false;
continue;
}
if(sv.get_obj().at("$map").type() != json_spirit::obj_type) {
continue;
}
auto& schema_obj = sv.get_obj().at("$map").get_obj();
auto& value_obj = rv.get_obj();
if(checkCoverage) schemaCoverage(spath + ".$map");
for(auto& value_pair : value_obj) {
auto vpath = kpath + "[" + value_pair.first + "]";
auto upath = spath + ".$map";
if (value_pair.second.type() != json_spirit::obj_type) {
errorStr += format("ERROR: Expected an object for `%s'\n", vpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", vpath).detail("ValueType", value_pair.second.type());
ok = false;
continue;
}
if (!schemaMatch(schema_obj, value_pair.second.get_obj(), errorStr, sev, checkCoverage, vpath, upath))
ok = false;
}
} else {
// The schema entry isn't an operator, so it asserts a type and (depending on the type) recursive schema definition
if (normJSONType(sv.type()) != normJSONType(rv.type())) {
errorStr += format("ERROR: Incorrect value type for key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
ok = false;
continue;
}
if (rv.type() == json_spirit::array_type) {
auto& value_array = rv.get_array();
auto& schema_array = sv.get_array();
if (!schema_array.size()) {
// An empty schema array means that the value array is required to be empty
if (value_array.size()) {
errorStr += format("ERROR: Expected an empty array for key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaSize", schema_array.size()).detail("ValueSize", value_array.size());
ok = false;
continue;
}
} else if (schema_array.size() == 1 && schema_array[0].type() == json_spirit::obj_type) {
// A one item schema array means that all items in the value must match the first item in the schema
auto& schema_obj = schema_array[0].get_obj();
int index = 0;
for(auto &value_item : value_array) {
if (value_item.type() != json_spirit::obj_type) {
errorStr += format("ERROR: Expected all array elements to be objects for key `%s'\n", kpath.c_str());
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath + format("[%d]",index)).detail("ValueType", value_item.type());
ok = false;
continue;
}
if (!schemaMatch(schema_obj, value_item.get_obj(), errorStr, sev, checkCoverage, kpath + format("[%d]", index), spath + "[0]"))
ok = false;
index++;
}
} else
ASSERT(false); // Schema doesn't make sense
} else if (rv.type() == json_spirit::obj_type) {
auto& schema_obj = sv.get_obj();
auto& value_obj = rv.get_obj();
if (!schemaMatch(schema_obj, value_obj, errorStr, sev, checkCoverage, kpath, spath))
ok = false;
}
}
}
return ok;
} catch (std::exception& e) {
TraceEvent(SevError, "SchemaMatchException").detail("What", e.what()).detail("Path", path).detail("SchemaPath", schema_path);
throw unknown_error();
}
}
TEST_CASE("ManagementAPI/AutoQuorumChange/checkLocality") {
Void _ = wait(Future<Void>(Void()));

View File

@ -46,6 +46,7 @@ public:
CONFLICTING_OPTIONS,
UNKNOWN_OPTION,
INCOMPLETE_CONFIGURATION,
INVALID_CONFIGURATION,
DATABASE_ALREADY_CREATED,
DATABASE_CREATED,
SUCCESS
@ -166,4 +167,8 @@ Future<Void> forceRecovery (Reference<ClusterConnectionFile> const& clusterFile)
// Gets the cluster connection string
Future<std::vector<NetworkAddress>> getCoordinators( Database const& cx );
void schemaCoverage( std::string const& spath, bool covered=true );
bool schemaMatch( StatusObject const schema, StatusObject const result, std::string& errorStr, Severity sev=SevError, bool checkCoverage=false, std::string path = std::string(), std::string schema_path = std::string() );
#endif

View File

@ -311,7 +311,7 @@ ACTOR Future<Void> monitorNominee( Key key, ClientLeaderRegInterface coord, Asyn
state Optional<LeaderInfo> li = wait( retryBrokenPromise( coord.getLeader, GetLeaderRequest( key, info->present() ? info->get().changeID : UID() ), TaskCoordinationReply ) );
Void _ = wait( Future<Void>(Void()) ); // Make sure we weren't cancelled
TraceEvent("GetLeaderReply").detail("Coordinator", coord.getLeader.getEndpoint().address).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation);
TraceEvent("GetLeaderReply").suppressFor(1.0).detail("Coordinator", coord.getLeader.getEndpoint().address).detail("Nominee", li.present() ? li.get().changeID : UID()).detail("Generation", generation);
if (li != *info) {
*info = li;

View File

@ -789,7 +789,6 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
}
case FDBNetworkOptions::TLS_PLUGIN:
validateOptionValue(value, true);
initTLSOptions();
break;
case FDBNetworkOptions::TLS_CERT_PATH:
validateOptionValue(value, true);

637
fdbclient/Schemas.cpp Normal file
View File

@ -0,0 +1,637 @@
/*
* Schemas.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Schemas.h"
const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
{
"cluster":{
"layers":{
"_valid":true,
"_error":"some error description"
},
"processes":{
"$map":{
"version":"3.0.0",
"machine_id":"0ccb4e0feddb5583010f6b77d9d10ece",
"locality":{
"$map":"value"
},
"class_source":{
"$enum":[
"command_line",
"configure_auto",
"set_class"
]
},
"class_type":{
"$enum":[
"unset",
"storage",
"transaction",
"resolution",
"proxy",
"master",
"test"
]
},
"roles":[
{
"query_queue_max":0,
"input_bytes":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"stored_bytes":12341234,
"kvstore_used_bytes":12341234,
"kvstore_available_bytes":12341234,
"kvstore_free_bytes":12341234,
"kvstore_total_bytes":12341234,
"durable_bytes":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"queue_disk_used_bytes":12341234,
"queue_disk_available_bytes":12341234,
"queue_disk_free_bytes":12341234,
"queue_disk_total_bytes":12341234,
"role":{
"$enum":[
"master",
"proxy",
"log",
"storage",
"resolver",
"cluster_controller"
]
},
"data_version":12341234,
"data_lag": {
"seconds":5.0,
"versions":12341234
},
"id":"eb84471d68c12d1d26f692a50000003f",
"finished_queries":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
}
],
"command_line":"-r simulation",
"memory":{
"available_bytes":0,
"limit_bytes":0,
"unused_allocated_memory":0,
"used_bytes":0
},
"messages":[
{
"time":12345.12312,
"type":"x",
"name":{
"$enum":[
"file_open_error",
"incorrect_cluster_file_contents",
"process_error",
"io_error",
"io_timeout",
"platform_error",
"storage_server_lagging",
"(other FDB error messages)"
]
},
"raw_log_message":"<stuff/>",
"description":"abc"
}
],
"fault_domain":"0ccb4e0fdbdb5583010f6b77d9d10ece",
"excluded":false,
"address":"1.2.3.4:1234",
"disk":{
"free_bytes":3451233456234,
"reads":{
"hz":0.0,
"counter":0,
"sectors":0
},
"busy":0.0,
"writes":{
"hz":0.0,
"counter":0,
"sectors":0
},
"total_bytes":123412341234
},
"uptime_seconds":1234.2345,
"cpu":{
"usage_cores":0.0
},
"network":{
"current_connections":0,
"connections_established":{
"hz":0.0
},
"connections_closed":{
"hz":0.0
},
"connection_errors":{
"hz":0.0
},
"megabits_sent":{
"hz":0.0
},
"megabits_received":{
"hz":0.0
}
}
}
},
"old_logs":[
{
"logs":[
{
"id":"7f8d623d0cb9966e",
"healthy":true,
"address":"1.2.3.4:1234"
}
],
"log_replication_factor":3,
"log_write_anti_quorum":0,
"log_fault_tolerance":2,
"remote_log_replication_factor":3,
"remote_log_fault_tolerance":2,
"satellite_log_replication_factor":3,
"satellite_log_write_anti_quorum":0,
"satellite_log_fault_tolerance":2
}
],
"fault_tolerance":{
"max_machine_failures_without_losing_availability":0,
"max_machine_failures_without_losing_data":0
},
"qos":{
"worst_queue_bytes_log_server":460,
"performance_limited_by":{
"reason_server_id":"7f8d623d0cb9966e",
"reason_id":0,
"name":{
"$enum":[
"workload",
"storage_server_write_queue_size",
"storage_server_write_bandwidth_mvcc",
"storage_server_readable_behind",
"log_server_mvcc_write_bandwidth",
"log_server_write_queue",
"storage_server_min_free_space",
"storage_server_min_free_space_ratio",
"log_server_min_free_space",
"log_server_min_free_space_ratio"
]
},
"description":"The database is not being saturated by the workload."
},
"transactions_per_second_limit":0,
"released_transactions_per_second":0,
"limiting_queue_bytes_storage_server":0,
"worst_queue_bytes_storage_server":0,
"limiting_version_lag_storage_server":0,
"worst_version_lag_storage_server":0
},
"incompatible_connections":[
],
"datacenter_version_difference":0,
"database_available":true,
"database_locked":false,
"generation":2,
"latency_probe":{
"read_seconds":7,
"immediate_priority_transaction_start_seconds":0.0,
"batch_priority_transaction_start_seconds":0.0,
"transaction_start_seconds":0.0,
"commit_seconds":0.02
},
"clients":{
"count":1,
"supported_versions":[
{
"client_version":"3.0.0",
"connected_clients":[
{
"address":"127.0.0.1:9898",
"log_group":"default"
}
],
"count" : 1,
"protocol_version" : "fdb00a400050001",
"source_version" : "9430e1127b4991cbc5ab2b17f41cfffa5de07e9d"
}
]
},
"messages":[
{
"reasons":[
{
"description":"Blah."
}
],
"unreachable_processes":[
{
"address":"1.2.3.4:1234"
}
],
"name":{
"$enum":[
"unreachable_master_worker",
"unreadable_configuration",
"full_replication_timeout",
"client_issues",
"unreachable_processes",
"immediate_priority_transaction_start_probe_timeout",
"batch_priority_transaction_start_probe_timeout",
"transaction_start_probe_timeout",
"read_probe_timeout",
"commit_probe_timeout",
"storage_servers_error",
"status_incomplete",
"layer_status_incomplete",
"database_availability_timeout"
]
},
"issues":[
{
"name":{
"$enum":[
"incorrect_cluster_file_contents"
]
},
"description":"Cluster file contents do not match current cluster connection string. Verify cluster file is writable and has not been overwritten externally."
}
],
"description":"abc"
}
],
)statusSchema" R"statusSchema(
"recovery_state":{
"required_resolvers":1,
"required_proxies":1,
"name":{
"$enum":[
"reading_coordinated_state",
"locking_coordinated_state",
"locking_old_transaction_servers",
"reading_transaction_system_state",
"configuration_missing",
"configuration_never_created",
"configuration_invalid",
"recruiting_transaction_servers",
"initializing_transaction_servers",
"recovery_transaction",
"writing_coordinated_state",
"accepting_commits",
"all_logs_recruited",
"storage_recovered",
"fully_recovered"
]
},
"required_logs":3,
"missing_logs":"7f8d623d0cb9966e",
"description":"Recovery complete."
},
"workload":{
"operations":{
"writes":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"reads":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"bytes":{
"written":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"read":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"keys":{
"read":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"transactions":{
"started":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"conflicted":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"committed":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
}
},
"cluster_controller_timestamp":1415650089,
"protocol_version":"fdb00a400050001",
"connection_string":"a:a@127.0.0.1:4000",
"full_replication":true,
"configuration":{
"log_anti_quorum":0,
"log_replicas":2,
"log_replication_policy":"(zoneid^3x1)",
"redundancy_mode":{
"$enum":[
"single",
"double",
"triple",
"three_datacenter",
"three_datacenter_fallback",
"three_data_hall"
]},
"regions":[{
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
}],
"satellite_redundancy_mode":{
"$enum":[
"one_satellite_single",
"one_satellite_double",
"one_satellite_triple",
"two_satellite_safe",
"two_satellite_fast"
]},
"satellite_log_replicas":1,
"satellite_usable_dcs":1,
"satellite_anti_quorum":0,
"satellite_log_policy":"(zoneid^3x1)",
"satellite_logs":2
}],
"remote_redundancy_mode":{
"$enum":[
"remote_default",
"remote_single",
"remote_double",
"remote_triple",
"remote_three_data_hall"
]},
"remote_log_replicas":3,
"remote_logs":5,
"log_routers":10,
"usable_regions":1,
"repopulate_anti_quorum":1,
"storage_replicas":1,
"resolvers":1,
"storage_replication_policy":"(zoneid^3x1)",
"logs":2,
"storage_engine":{
"$enum":[
"ssd",
"ssd-1",
"ssd-2",
"memory"
]},
"coordinators_count":1,
"excluded_servers":[
{
"address":"10.0.4.1"
}
],
"auto_proxies":3,
"auto_resolvers":1,
"auto_logs":3,
"proxies":5
},
"data":{
"least_operating_space_bytes_log_server":0,
"average_partition_size_bytes":0,
"state":{
"healthy":true,
"min_replicas_remaining":0,
"name":{
"$enum":[
"initializing",
"missing_data",
"healing",
"healthy_repartitioning",
"healthy_removing_server",
"healthy_rebalancing",
"healthy"
]
},
"description":""
},
"least_operating_space_ratio_storage_server":0.1,
"max_machine_failures_without_losing_availability":0,
"total_disk_used_bytes":0,
"total_kv_size_bytes":0,
"partitions_count":2,
"moving_data":{
"total_written_bytes":0,
"in_flight_bytes":0,
"in_queue_bytes":0,
"highest_priority":0
},
"team_trackers":[
{
"primary":true,
"in_flight_bytes":0,
"unhealthy_servers":0,
"state":{
"healthy":true,
"min_replicas_remaining":0,
"name":{
"$enum":[
"initializing",
"missing_data",
"healing",
"healthy_repartitioning",
"healthy_removing_server",
"healthy_rebalancing",
"healthy"
]
},
"description":""
}
}
],
"least_operating_space_bytes_storage_server":0,
"max_machine_failures_without_losing_data":0
},
"machines":{
"$map":{
"network":{
"megabits_sent":{
"hz":0.0
},
"megabits_received":{
"hz":0.0
},
"tcp_segments_retransmitted":{
"hz":0.0
}
},
"memory":{
"free_bytes":0,
"committed_bytes":0,
"total_bytes":0
},
"contributing_workers":4,
"datacenter_id":"6344abf1813eb05b",
"excluded":false,
"address":"1.2.3.4",
"machine_id":"6344abf1813eb05b",
"locality":{
"$map":"value"
},
"cpu":{
"logical_core_utilization":0.4
}
}
}
},
"client":{
"coordinators":{
"coordinators":[
{
"reachable":true,
"address":"127.0.0.1:4701"
}
],
"quorum_reachable":true
},
"database_status":{
"available":true,
"healthy":true
},
"messages":[
{
"name":{
"$enum":[
"inconsistent_cluster_file",
"unreachable_cluster_controller",
"no_cluster_controller",
"status_incomplete_client",
"status_incomplete_coordinators",
"status_incomplete_error",
"status_incomplete_timeout",
"status_incomplete_cluster",
"quorum_not_reachable"
]
},
"description":"The cluster file is not up to date."
}
],
"timestamp":1415650089,
"cluster_file":{
"path":"/etc/foundationdb/fdb.cluster",
"up_to_date":true
}
}
})statusSchema");
const KeyRef JSONSchemas::configurationSchema = LiteralStringRef(R"configSchema(
{
"create":{
"$enum":[
"new"
]},
"log_anti_quorum":0,
"log_replicas":2,
"log_replication_policy":"(zoneid^3x1)",
"redundancy_mode":{
"$enum":[
"single",
"double",
"triple",
"three_datacenter",
"three_datacenter_fallback",
"three_data_hall"
]},
"regions":[{
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
}],
"satellite_redundancy_mode":{
"$enum":[
"one_satellite_single",
"one_satellite_double",
"one_satellite_triple",
"two_satellite_safe",
"two_satellite_fast"
]},
"satellite_log_replicas":1,
"satellite_usable_dcs":1,
"satellite_anti_quorum":0,
"satellite_log_policy":"(zoneid^3x1)",
"satellite_logs":2
}],
"remote_redundancy_mode":{
"$enum":[
"remote_default",
"remote_single",
"remote_double",
"remote_triple",
"remote_three_data_hall"
]},
"remote_log_replicas":3,
"remote_logs":5,
"log_routers":10,
"usable_regions":1,
"repopulate_anti_quorum":1,
"storage_replicas":1,
"resolvers":1,
"storage_replication_policy":"(zoneid^3x1)",
"logs":2,
"storage_engine":{
"$enum":[
"ssd",
"ssd-1",
"ssd-2",
"memory"
]},
"auto_proxies":3,
"auto_resolvers":1,
"auto_logs":3,
"proxies":5
})configSchema");

34
fdbclient/Schemas.h Normal file
View File

@ -0,0 +1,34 @@
/*
* Schemas.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_SCHEMAS_H
#define FDBCLIENT_SCHEMAS_H
#pragma once
#include "flow/flow.h"
#include "FDBTypes.h"
struct JSONSchemas {
static const KeyRef statusSchema;
static const KeyRef configurationSchema;
};
#endif /* FDBCLIENT_SCHEMAS_H */

View File

@ -65,6 +65,7 @@
<ClInclude Include="ReadYourWrites.h" />
<ActorCompiler Include="RunTransaction.actor.h" />
<ClInclude Include="RYWIterator.h" />
<ClInclude Include="Schemas.h" />
<ClInclude Include="SnapshotCache.h" />
<ClInclude Include="Status.h" />
<ClInclude Include="StatusClient.h" />
@ -97,7 +98,8 @@
<ActorCompiler Include="MultiVersionTransaction.actor.cpp" />
<ClCompile Include="RYWIterator.cpp" />
<ActorCompiler Include="StatusClient.actor.cpp" />
<ClCompile Include="Status.cpp" />
<ClCompile Include="Status.cpp" />
<ClCompile Include="Schemas.cpp" />
<ClCompile Include="SystemData.cpp" />
<ActorCompiler Include="ThreadSafeTransaction.actor.cpp" />
<ActorCompiler Include="TaskBucket.actor.cpp" />

View File

@ -53,7 +53,7 @@ description is not currently required but encouraged.
description="Set internal tuning or debugging knobs"/>
<Option name="TLS_plugin" code="41"
paramType="String" paramDescription="file path or linker-resolved name"
description="Set the TLS plugin to load. This option, if used, must be set before any other TLS options" />
description="Deprecated" />
<Option name="TLS_cert_bytes" code="42"
paramType="Bytes" paramDescription="certificates"
description="Set the certificate chain" />

View File

@ -283,6 +283,7 @@ struct Peer : NonCopyable {
if ( !destination.isPublic() || outgoingConnectionIdle || destination > transport->localAddress ) {
// Keep the new connection
TraceEvent("IncomingConnection", conn->getDebugID())
.suppressFor(1.0)
.detail("FromAddr", conn->getPeerAddress())
.detail("CanonicalAddr", destination)
.detail("IsPublic", destination.isPublic());
@ -292,6 +293,7 @@ struct Peer : NonCopyable {
connect = connectionKeeper( this, conn, reader );
} else {
TraceEvent("RedundantConnection", conn->getDebugID())
.suppressFor(1.0)
.detail("FromAddr", conn->getPeerAddress().toString())
.detail("CanonicalAddr", destination);
@ -321,7 +323,7 @@ struct Peer : NonCopyable {
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePing.getEndpoint() );
choose {
when (Void _ = wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) { TraceEvent("ConnectionTimeout").detail("WithAddr", peer->destination); throw connection_failed(); }
when (Void _ = wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) { TraceEvent("ConnectionTimeout").suppressFor(1.0).detail("WithAddr", peer->destination); throw connection_failed(); }
when (Void _ = wait( reply.getFuture() )) {}
when (Void _ = wait( peer->incompatibleDataRead.onTrigger())) {}
}

View File

@ -83,8 +83,7 @@ ACTOR static Future<Void> handshake( TLSConnection* self ) {
Void _ = wait( r == ITLSSession::WANT_WRITE ? self->conn->onWritable() : self->conn->onReadable() );
}
TraceEvent("TLSConnectionHandshakeSuccessful", self->getDebugID())
.detail("Peer", self->getPeerAddress());
TraceEvent("TLSConnectionHandshakeSuccessful", self->getDebugID()).suppressFor(1.0).detail("Peer", self->getPeerAddress());
return Void();
}
@ -231,6 +230,7 @@ void TLSOptions::set_ca_data(std::string const& ca_data) {
init_plugin();
TraceEvent("TLSConnectionSettingCAData").detail("CADataSize", ca_data.size());
policyInfo.ca_contents = Standalone<StringRef>(ca_data);
if (!policyVerifyPeersSet.get()->set_ca_data((const uint8_t*)&ca_data[0], ca_data.size()))
throw tls_error();
if (!policyVerifyPeersNotSet.get()->set_ca_data((const uint8_t*)&ca_data[0], ca_data.size()))
@ -244,6 +244,7 @@ void TLSOptions::set_cert_data( std::string const& cert_data ) {
init_plugin();
TraceEvent("TLSConnectionSettingCertData").detail("CertDataSize", cert_data.size());
policyInfo.cert_contents = Standalone<StringRef>(cert_data);
if ( !policyVerifyPeersSet.get()->set_cert_data( (const uint8_t*)&cert_data[0], cert_data.size() ) )
throw tls_error();
if (!policyVerifyPeersNotSet.get()->set_cert_data((const uint8_t*)&cert_data[0], cert_data.size()))
@ -273,6 +274,7 @@ void TLSOptions::set_key_data( std::string const& key_data ) {
init_plugin();
const char *passphrase = policyInfo.keyPassword.empty() ? NULL : policyInfo.keyPassword.c_str();
TraceEvent("TLSConnectionSettingKeyData").detail("KeyDataSize", key_data.size());
policyInfo.key_contents = Standalone<StringRef>(key_data);
if ( !policyVerifyPeersSet.get()->set_key_data( (const uint8_t*)&key_data[0], key_data.size(), passphrase) )
throw tls_error();
if (!policyVerifyPeersNotSet.get()->set_key_data((const uint8_t*)&key_data[0], key_data.size(), passphrase))
@ -310,7 +312,7 @@ void TLSOptions::register_network() {
}
ACTOR static Future<ErrorOr<Standalone<StringRef>>> readEntireFile( std::string filename ) {
state Reference<IAsyncFile> file = wait(IAsyncFileSystem::filesystem()->open(filename, IAsyncFile::OPEN_READONLY, 0));
state Reference<IAsyncFile> file = wait(IAsyncFileSystem::filesystem()->open(filename, IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED, 0));
state int64_t filesize = wait(file->size());
state Standalone<StringRef> buf = makeString(filesize);
int rc = wait(file->read(mutateString(buf), filesize, 0));
@ -347,6 +349,7 @@ ACTOR static Future<Void> reloadConfigurationOnChange( TLSOptions::PolicyInfo *p
}
Void _ = wait(delay(1.0));
}
state int mismatches = 0;
state AsyncVar<Standalone<StringRef>> ca_var;
state AsyncVar<Standalone<StringRef>> key_var;
state AsyncVar<Standalone<StringRef>> cert_var;
@ -359,14 +362,30 @@ ACTOR static Future<Void> reloadConfigurationOnChange( TLSOptions::PolicyInfo *p
state Future<Void> key_changed = key_var.onChange();
state Future<Void> cert_changed = cert_var.onChange();
Void _ = wait( ca_changed || key_changed || cert_changed );
if (ca_changed.isReady()) pci->ca_contents = ca_var.get();
if (key_changed.isReady()) pci->key_contents = key_var.get();
if (cert_changed.isReady()) pci->cert_contents = cert_var.get();
try {
Reference<ITLSPolicy> verifypeers = Reference<ITLSPolicy>(plugin->create_policy());
verifypeers->set_ca_data(pci->ca_contents.begin(), pci->ca_contents.size());
verifypeers->set_key_data(pci->key_contents.begin(), pci->key_contents.size(), pci->keyPassword.c_str());
verifypeers->set_cert_data(pci->cert_contents.begin(), pci->cert_contents.size());
if (ca_changed.isReady()) {
TraceEvent(SevInfo, "TLSRefreshCAChanged").detail("path", pci->ca_path).detail("length", ca_var.get().size());
pci->ca_contents = ca_var.get();
}
if (key_changed.isReady()) {
TraceEvent(SevInfo, "TLSRefreshKeyChanged").detail("path", pci->key_path).detail("length", key_var.get().size());
pci->key_contents = key_var.get();
}
if (cert_changed.isReady()) {
TraceEvent(SevInfo, "TLSRefreshCertChanged").detail("path", pci->cert_path).detail("length", cert_var.get().size());
pci->cert_contents = cert_var.get();
}
bool rc = true;
Reference<ITLSPolicy> verifypeers = Reference<ITLSPolicy>(plugin->create_policy());
Reference<ITLSPolicy> noverifypeers = Reference<ITLSPolicy>(plugin->create_policy());
loop {
// Don't actually loop. We're just using loop/break as a `goto err`.
// This loop always ends with an unconditional break.
rc = verifypeers->set_ca_data(pci->ca_contents.begin(), pci->ca_contents.size());
if (!rc) break;
rc = verifypeers->set_key_data(pci->key_contents.begin(), pci->key_contents.size(), pci->keyPassword.c_str());
if (!rc) break;
rc = verifypeers->set_cert_data(pci->cert_contents.begin(), pci->cert_contents.size());
if (!rc) break;
{
std::unique_ptr<const uint8_t *[]> verify_peers_arr(new const uint8_t*[pci->verify_peers.size()]);
std::unique_ptr<int[]> verify_peers_len(new int[pci->verify_peers.size()]);
@ -374,18 +393,27 @@ ACTOR static Future<Void> reloadConfigurationOnChange( TLSOptions::PolicyInfo *p
verify_peers_arr[i] = (const uint8_t *)&pci->verify_peers[i][0];
verify_peers_len[i] = pci->verify_peers[i].size();
}
verifypeers->set_verify_peers(pci->verify_peers.size(), verify_peers_arr.get(), verify_peers_len.get());
rc = verifypeers->set_verify_peers(pci->verify_peers.size(), verify_peers_arr.get(), verify_peers_len.get());
if (!rc) break;
}
Reference<ITLSPolicy> noverifypeers = Reference<ITLSPolicy>(plugin->create_policy());
noverifypeers->set_ca_data(pci->ca_contents.begin(), pci->ca_contents.size());
noverifypeers->set_key_data(pci->key_contents.begin(), pci->key_contents.size(), pci->keyPassword.c_str());
noverifypeers->set_cert_data(pci->cert_contents.begin(), pci->cert_contents.size());
rc = noverifypeers->set_ca_data(pci->ca_contents.begin(), pci->ca_contents.size());
if (!rc) break;
rc = noverifypeers->set_key_data(pci->key_contents.begin(), pci->key_contents.size(), pci->keyPassword.c_str());
if (!rc) break;
rc = noverifypeers->set_cert_data(pci->cert_contents.begin(), pci->cert_contents.size());
if (!rc) break;
break;
}
if (rc) {
TraceEvent(SevInfo, "TLSCertificateRefreshSucceeded");
realVerifyPeersPolicy->set(verifypeers);
realNoVerifyPeersPolicy->set(noverifypeers);
} catch (Error& e) {
mismatches = 0;
} else {
// Some files didn't match up, they should in the future, and we'll retry then.
TraceEvent(SevWarn, "TLSCertificateRefresh").error(e);
mismatches++;
TraceEvent(SevWarn, "TLSCertificateRefreshMismatch").detail("mismatches", mismatches);
}
}
}

View File

@ -94,8 +94,10 @@ public:
DatabaseConfiguration config; // Asynchronously updated via master registration
DatabaseConfiguration fullyRecoveredConfig;
Database db;
int unfinishedRecoveries;
int logGenerations;
DBInfo() : masterRegistrationCount(0), recoveryStalled(false), forceRecovery(false),
DBInfo() : masterRegistrationCount(0), recoveryStalled(false), forceRecovery(false), unfinishedRecoveries(0), logGenerations(0),
clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ),
serverInfo( new AsyncVar<ServerDBInfo>( ServerDBInfo( LiteralStringRef("DB") ) ) ),
db( DatabaseContext::create( clientInfo, Future<Void>(), LocalityData(), true, TaskDefaultEndpoint, true ) ) // SOMEDAY: Locality!
@ -1059,6 +1061,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
rmq.forceRecovery = db->forceRecovery;
cluster->masterProcessId = masterWorker.worker.first.locality.processId();
cluster->db.unfinishedRecoveries++;
ErrorOr<MasterInterface> newMaster = wait( masterWorker.worker.first.master.tryGetReply( rmq ) );
if (newMaster.present()) {
TraceEvent("CCWDB", cluster->id).detail("Recruited", newMaster.get().id());
@ -1345,7 +1348,7 @@ struct FailureStatusInfo {
};
//The failure monitor client relies on the fact that the failure detection server will not declare itself failed
ACTOR Future<Void> failureDetectionServer( UID uniqueID, FutureStream< FailureMonitoringRequest > requests ) {
ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData::DBInfo* db, FutureStream< FailureMonitoringRequest > requests ) {
state Version currentVersion = 0;
state std::map<NetworkAddress, FailureStatusInfo> currentStatus; // The status at currentVersion
state std::deque<SystemFailureStatus> statusHistory; // The last change in statusHistory is from currentVersion-1 to currentVersion
@ -1443,13 +1446,16 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, FutureStream< FailureMo
//TraceEvent("FailureDetectionPoll", uniqueID).detail("PivotDelay", pivotDelay).detail("Clients", currentStatus.size());
//TraceEvent("FailureDetectionAcceptableDelay").detail("Delay", acceptableDelay1000);
bool tooManyLogGenerations = std::max(db->unfinishedRecoveries, db->logGenerations) > CLIENT_KNOBS->FAILURE_MAX_GENERATIONS;
for(auto it = currentStatus.begin(); it != currentStatus.end(); ) {
double delay = t - it->second.lastRequestTime;
if ( it->first != g_network->getLocalAddress() && ( delay > pivotDelay * 2 + FLOW_KNOBS->SERVER_REQUEST_INTERVAL + CLIENT_KNOBS->FAILURE_MIN_DELAY || delay > CLIENT_KNOBS->FAILURE_MAX_DELAY ) ) {
if ( it->first != g_network->getLocalAddress() && ( tooManyLogGenerations ?
( delay > CLIENT_KNOBS->FAILURE_EMERGENCY_DELAY ) :
( delay > pivotDelay * 2 + FLOW_KNOBS->SERVER_REQUEST_INTERVAL + CLIENT_KNOBS->FAILURE_MIN_DELAY || delay > CLIENT_KNOBS->FAILURE_MAX_DELAY ) ) ) {
//printf("Failure Detection Server: Status of '%s' is now '%s' after %f sec\n", it->first.toString().c_str(), "Failed", now() - it->second.lastRequestTime);
TraceEvent("FailureDetectionStatus", uniqueID).detail("System", it->first).detail("Status","Failed").detail("Why", "Timeout").detail("LastRequestAge", delay)
.detail("PivotDelay", pivotDelay);
.detail("PivotDelay", pivotDelay).detail("UnfinishedRecoveries", db->unfinishedRecoveries).detail("LogGenerations", db->logGenerations);
statusHistory.push_back( SystemFailureStatus( it->first, FailureStatus(true) ) );
++currentVersion;
it = currentStatus.erase(it);
@ -1555,6 +1561,14 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
return;
}
if ( req.recoveryState == RecoveryState::FULLY_RECOVERED ) {
self->db.unfinishedRecoveries = 0;
self->db.logGenerations = 0;
ASSERT( !req.logSystemConfig.oldTLogs.size() );
} else {
self->db.logGenerations = std::max<int>(self->db.logGenerations, req.logSystemConfig.oldTLogs.size());
}
db->masterRegistrationCount = req.registrationCount;
db->recoveryStalled = req.recoveryStalled;
if ( req.configuration.present() ) {
@ -2135,7 +2149,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
state Future<ErrorOr<Void>> error = errorOr( actorCollection( addActor.getFuture() ) );
auto pSelf = &self;
addActor.send( failureDetectionServer( self.id, interf.clientInterface.failureMonitoring.getFuture() ) );
addActor.send( failureDetectionServer( self.id, &self.db, interf.clientInterface.failureMonitoring.getFuture() ) );
addActor.send( clusterWatchDatabase( &self, &self.db ) ); // Start the master database
addActor.send( self.updateWorkerList.init( self.db.db ) );
addActor.send( statusServer( interf.clientInterface.databaseStatus.getFuture(), &self, coordinators));

View File

@ -48,8 +48,12 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
Promise<Void> removed;
Future<Void> onRemoved;
Promise<Void> wakeUpTracker;
bool inDesiredDC;
LocalityEntry localityEntry;
TCServerInfo(StorageServerInterface ssi, ProcessClass processClass) : id(ssi.id()), lastKnownInterface(ssi), lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()) {}
TCServerInfo(StorageServerInterface ssi, ProcessClass processClass, bool inDesiredDC, Reference<LocalitySet> storageServerSet) : id(ssi.id()), lastKnownInterface(ssi), lastKnownClass(processClass), dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()), inDesiredDC(inDesiredDC) {
localityEntry = ((LocalityMap<UID>*) storageServerSet.getPtr())->add(ssi.locality, &id);
}
};
ACTOR Future<Void> updateServerMetrics( TCServerInfo *server ) {
@ -260,7 +264,7 @@ struct ServerStatus {
bool isUnhealthy() const { return isFailed || isUndesired; }
const char* toString() const { return isFailed ? "Failed" : isUndesired ? "Undesired" : "Healthy"; }
bool operator == (ServerStatus const& r) const { return isFailed == r.isFailed && isUndesired == r.isUndesired && isWrongConfiguration == r.isWrongConfiguration && locality.zoneId() == r.locality.zoneId(); }
bool operator == (ServerStatus const& r) const { return isFailed == r.isFailed && isUndesired == r.isUndesired && isWrongConfiguration == r.isWrongConfiguration && locality == r.locality; }
//If a process has reappeared without the storage server that was on it (isFailed == true), we don't need to exclude it
//We also don't need to exclude processes who are in the wrong configuration (since those servers will be removed)
@ -305,11 +309,15 @@ ACTOR Future<Void> storageServerFailureTracker(
Version addedVersion )
{
loop {
bool unhealthy = statusMap->count(server.id()) && statusMap->get(server.id()).isUnhealthy();
if(unhealthy && !status->isUnhealthy()) {
(*unhealthyServers)--;
}
if(!unhealthy && status->isUnhealthy()) {
if( statusMap->count(server.id()) ) {
bool unhealthy = statusMap->get(server.id()).isUnhealthy();
if(unhealthy && !status->isUnhealthy()) {
(*unhealthyServers)--;
}
if(!unhealthy && status->isUnhealthy()) {
(*unhealthyServers)++;
}
} else if(status->isUnhealthy()) {
(*unhealthyServers)++;
}
@ -501,7 +509,7 @@ Future<Void> storageServerTracker(
Promise<Void> const& errorOut,
Version const& addedVersion);
Future<Void> teamTracker( struct DDTeamCollection* const& self, Reference<IDataDistributionTeam> const& team );
Future<Void> teamTracker( struct DDTeamCollection* const& self, Reference<TCTeamInfo> const& team );
struct DDTeamCollection {
enum { REQUESTING_WORKER = 0, GETTING_WORKER = 1, GETTING_STORAGE = 2 };
@ -550,6 +558,33 @@ struct DDTeamCollection {
Future<Void> readyToStart;
Future<Void> checkTeamDelay;
Reference<LocalitySet> storageServerSet;
std::vector<LocalityEntry> forcedEntries, resultEntries;
void resetLocalitySet() {
storageServerSet = Reference<LocalitySet>(new LocalityMap<UID>());
LocalityMap<UID>* storageServerMap = (LocalityMap<UID>*) storageServerSet.getPtr();
for( auto& it : server_info ) {
it.second->localityEntry = storageServerMap->add(it.second->lastKnownInterface.locality, &it.second->id);
}
}
bool satisfiesPolicy(const std::vector<Reference<TCServerInfo>>& team, int amount = -1) {
forcedEntries.clear();
resultEntries.clear();
if(amount == -1) {
amount = team.size();
}
for(int i = 0; i < amount; i++) {
forcedEntries.push_back(team[i]->localityEntry);
}
bool result = storageServerSet->selectReplicas(configuration.storagePolicy, forcedEntries, resultEntries);
return result && resultEntries.size() == 0;
}
DDTeamCollection(
Database const& cx,
UID masterId,
@ -564,7 +599,7 @@ struct DDTeamCollection {
Reference<AsyncVar<bool>> processingUnhealthy)
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
configuration(configuration), serverChanges(serverChanges), readyToStart(readyToStart), checkTeamDelay( delay( SERVER_KNOBS->CHECK_TEAM_DELAY, TaskDataDistribution) ),
initialFailureReactionDelay( delayed( readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ),
initialFailureReactionDelay( delayed( readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskDataDistribution ) ), healthyTeamCount( 0 ), storageServerSet(new LocalityMap<UID>()),
initializationDoneActor(logOnCompletion(readyToStart && initialFailureReactionDelay, this)), optimalTeamCount( 0 ), recruitingStream(0), restartRecruiting( SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY ),
unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs), zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary), processingUnhealthy(processingUnhealthy)
{
@ -593,7 +628,7 @@ struct DDTeamCollection {
teamBuilder.cancel();
}
ACTOR Future<Void> logOnCompletion( Future<Void> signal, DDTeamCollection *self ) {
ACTOR static Future<Void> logOnCompletion( Future<Void> signal, DDTeamCollection *self ) {
Void _ = wait(signal);
Void _ = wait(delay(SERVER_KNOBS->LOG_ON_COMPLETION_DELAY, TaskDataDistribution));
@ -606,7 +641,7 @@ struct DDTeamCollection {
return Void();
}
ACTOR Future<Void> checkBuildTeams( DDTeamCollection* self ) {
ACTOR static Future<Void> checkBuildTeams( DDTeamCollection* self ) {
state Promise<Void> restart;
Void _ = wait( self->checkTeamDelay );
@ -640,7 +675,7 @@ struct DDTeamCollection {
// SOMEDAY: Make bestTeam better about deciding to leave a shard where it is (e.g. in PRIORITY_TEAM_HEALTHY case)
// use keys, src, dest, metrics, priority, system load, etc.. to decide...
ACTOR Future<Void> getTeam( DDTeamCollection* self, GetTeamRequest req ) {
ACTOR static Future<Void> getTeam( DDTeamCollection* self, GetTeamRequest req ) {
try {
Void _ = wait( self->checkBuildTeams( self ) );
@ -799,53 +834,101 @@ struct DDTeamCollection {
return total;
}
void addSubsetOfEmergencyTeams() {
for( int i = 0; i < badTeams.size(); i++ ) {
auto& serverIds = badTeams[i]->getServerIDs();
bool foundTeam = false;
for( int j = 0; j < std::max( 1, (int)(serverIds.size() - configuration.storageTeamSize + 1) ) && !foundTeam; j++ ) {
auto& serverTeams = server_info[serverIds[j]]->teams;
for( int k = 0; k < serverTeams.size(); k++ ) {
auto &testTeam = serverTeams[k]->getServerIDs();
bool allInTeam = true;
for( int l = 0; l < testTeam.size(); l++ ) {
if( std::find( serverIds.begin(), serverIds.end(), testTeam[l] ) == serverIds.end() ) {
allInTeam = false;
ACTOR static Future<Void> addSubsetOfEmergencyTeams( DDTeamCollection *self ) {
state int idx = 0;
state std::vector<Reference<TCServerInfo>> servers;
state std::vector<UID> serverIds;
state Reference<LocalitySet> tempSet = Reference<LocalitySet>(new LocalityMap<UID>());
state LocalityMap<UID>* tempMap = (LocalityMap<UID>*) tempSet.getPtr();
for(; idx < self->badTeams.size(); idx++ ) {
servers.clear();
for(auto server : self->badTeams[idx]->servers) {
if(server->inDesiredDC) {
servers.push_back(server);
}
}
if(servers.size() >= self->configuration.storageTeamSize) {
bool foundTeam = false;
for( int j = 0; j < servers.size() - self->configuration.storageTeamSize + 1 && !foundTeam; j++ ) {
auto& serverTeams = self->server_info[servers[j]->id]->teams;
for( int k = 0; k < serverTeams.size(); k++ ) {
auto &testTeam = serverTeams[k]->getServerIDs();
bool allInTeam = true;
for( int l = 0; l < testTeam.size(); l++ ) {
bool foundServer = false;
for( auto it : servers ) {
if( it->id == testTeam[l] ) {
foundServer = true;
break;
}
}
if(!foundServer) {
allInTeam = false;
break;
}
}
if( allInTeam ) {
foundTeam = true;
break;
}
}
if( allInTeam ) {
foundTeam = true;
break;
}
if( !foundTeam ) {
if( self->satisfiesPolicy(servers) ) {
if(servers.size() == self->configuration.storageTeamSize || self->satisfiesPolicy(servers, self->configuration.storageTeamSize)) {
servers.resize(self->configuration.storageTeamSize);
self->addTeam(servers);
} else {
tempSet->clear();
for( auto it : servers ) {
tempMap->add(it->lastKnownInterface.locality, &it->id);
}
self->resultEntries.clear();
self->forcedEntries.clear();
bool result = tempSet->selectReplicas(self->configuration.storagePolicy, self->forcedEntries, self->resultEntries);
ASSERT(result && self->resultEntries.size() == self->configuration.storageTeamSize);
serverIds.clear();
for(auto& it : self->resultEntries) {
serverIds.push_back(*tempMap->getObject(it));
}
self->addTeam(serverIds.begin(), serverIds.end());
}
} else {
serverIds.clear();
for(auto it : servers) {
serverIds.push_back(it->id);
}
TraceEvent(SevWarnAlways, "CannotAddSubset", self->masterId).detail("Servers", describe(serverIds));
}
}
}
if( !foundTeam ) {
addTeam(serverIds.begin(), serverIds.begin() + configuration.storageTeamSize );
}
Void _ = wait( yield() );
}
return Void();
}
void init( InitialDataDistribution const& initTeams ) {
ACTOR static Future<Void> init( DDTeamCollection *self, Reference<InitialDataDistribution> initTeams ) {
// SOMEDAY: If some servers have teams and not others (or some servers have more data than others) and there is an address/locality collision, should
// we preferentially mark the least used server as undesirable?
for (auto i = initTeams.allServers.begin(); i != initTeams.allServers.end(); ++i) {
if (shouldHandleServer(i->first)) {
addServer(i->first, i->second, serverTrackerErrorOut, 0);
for (auto i = initTeams->allServers.begin(); i != initTeams->allServers.end(); ++i) {
if (self->shouldHandleServer(i->first)) {
self->addServer(i->first, i->second, self->serverTrackerErrorOut, 0);
}
}
if(primary) {
for(auto t = initTeams.primaryTeams.begin(); t != initTeams.primaryTeams.end(); ++t) {
addTeam(t->begin(), t->end() );
}
} else {
for(auto t = initTeams.remoteTeams.begin(); t != initTeams.remoteTeams.end(); ++t) {
addTeam(t->begin(), t->end() );
}
state std::set<std::vector<UID>>::iterator teamIter = self->primary ? initTeams->primaryTeams.begin() : initTeams->remoteTeams.begin();
state std::set<std::vector<UID>>::iterator teamIterEnd = self->primary ? initTeams->primaryTeams.end() : initTeams->remoteTeams.end();
for(; teamIter != teamIterEnd; ++teamIter) {
self->addTeam(teamIter->begin(), teamIter->end() );
Void _ = wait( yield() );
}
addSubsetOfEmergencyTeams();
Void _ = wait( addSubsetOfEmergencyTeams(self) );
return Void();
}
void evaluateTeamQuality() {
@ -899,8 +982,18 @@ struct DDTeamCollection {
return exists;
}
void addTeam( std::set<UID> const& team ) {
addTeam(team.begin(), team.end());
void addTeam( const vector<Reference<TCServerInfo>>& newTeamServers ) {
Reference<TCTeamInfo> teamInfo( new TCTeamInfo( newTeamServers ) );
teamInfo->tracker = teamTracker( this, teamInfo );
if( teamInfo->servers.size() > configuration.storageTeamSize ) {
badTeams.push_back( teamInfo );
} else {
teams.push_back( teamInfo );
for (int i=0;i<newTeamServers.size();i++) {
server_info[ newTeamServers[i]->id ]->teams.push_back( teamInfo );
}
}
}
template<class InputIt>
@ -912,17 +1005,11 @@ struct DDTeamCollection {
}
}
Reference<TCTeamInfo> teamInfo( new TCTeamInfo( newTeamServers ) );
TraceEvent("TeamCreation", masterId).detail("Team", teamInfo->getDesc());
teamInfo->tracker = teamTracker( this, teamInfo );
if( teamInfo->servers.size() > configuration.storageTeamSize ) {
badTeams.push_back( teamInfo );
} else {
teams.push_back( teamInfo );
for (int i=0;i<newTeamServers.size();i++) {
server_info[ newTeamServers[i]->id ]->teams.push_back( teamInfo );
}
}
addTeam( newTeamServers );
}
void addTeam( std::set<UID> const& team ) {
addTeam(team.begin(), team.end());
}
ACTOR Future<Void> addAllTeams( DDTeamCollection *self, int location, vector<LocalityEntry>* history, Reference<LocalityMap<UID>> processes, vector<std::vector<UID>>* output, int teamLimit, int* addedTeams ) {
@ -1203,7 +1290,7 @@ struct DDTeamCollection {
allServers.push_back( newServer.id() );
TraceEvent("AddedStorageServer", masterId).detail("ServerID", newServer.id()).detail("ProcessClass", processClass.toString()).detail("WaitFailureToken", newServer.waitFailure.getEndpoint().token).detail("Address", newServer.waitFailure.getEndpoint().address);
auto &r = server_info[newServer.id()] = Reference<TCServerInfo>( new TCServerInfo( newServer, processClass ) );
auto &r = server_info[newServer.id()] = Reference<TCServerInfo>( new TCServerInfo( newServer, processClass, includedDCs.empty() || std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end(), storageServerSet ) );
r->tracker = storageServerTracker( this, cx, r.getPtr(), &server_status, lock, masterId, &server_info, serverChanges, errorOut, addedVersion );
restartTeamBuilder.trigger();
}
@ -1242,6 +1329,14 @@ struct DDTeamCollection {
}
server_info.erase( removedServer );
if(server_status.count(removedServer) && server_status.get(removedServer).isUnhealthy()) {
unhealthyServers--;
}
server_status.clear( removedServer );
//FIXME: add remove support to localitySet so we do not have to recreate it
resetLocalitySet();
// remove all teams that contain removedServer
// SOMEDAY: can we avoid walking through all teams, since we have an index of teams in which removedServer participated
for(int t=0; t<teams.size(); t++) {
@ -1270,7 +1365,7 @@ struct DDTeamCollection {
};
// Track a team and issue RelocateShards when the level of degradation changes
ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistributionTeam> team) {
ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<TCTeamInfo> team) {
state int lastServersLeft = team->getServerIDs().size();
state bool lastAnyUndesired = false;
state bool wrongSize = team->getServerIDs().size() != self->configuration.storageTeamSize;
@ -1303,28 +1398,29 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
auto servers = team->getServerIDs();
bool anyUndesired = false;
bool anyWrongConfiguration = false;
Reference<LocalityGroup> teamLocality(new LocalityGroup());
int serversLeft = 0;
for(auto s = servers.begin(); s != servers.end(); ++s) {
change.push_back( self->server_status.onChange( *s ) );
auto& status = self->server_status.get(*s);
if (!status.isFailed)
teamLocality->add( status.locality );
if (status.isUndesired)
if (!status.isFailed) {
serversLeft++;
}
if (status.isUndesired) {
anyUndesired = true;
if (status.isWrongConfiguration)
}
if (status.isWrongConfiguration) {
anyWrongConfiguration = true;
}
}
int serversLeft = teamLocality->size();
bool matchesPolicy = self->configuration.storagePolicy->validate(teamLocality->getEntries(), teamLocality);
if( !self->initialFailureReactionDelay.isReady() ) {
change.push_back( self->initialFailureReactionDelay );
}
change.push_back( self->zeroHealthyTeams->onChange() );
bool recheck = (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams->get())) && (!matchesPolicy || anyUndesired || team->getServerIDs().size() != self->configuration.storageTeamSize);
bool healthy = self->satisfiesPolicy(team->servers) && !anyUndesired && team->getServerIDs().size() == self->configuration.storageTeamSize && serversLeft == self->configuration.storageTeamSize;
bool recheck = !healthy && (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams->get()));
lastReady = self->initialFailureReactionDelay.isReady();
lastZeroHealthy = self->zeroHealthyTeams->get();
@ -1334,9 +1430,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
.detail("LastServersLeft", lastServersLeft).detail("ContainsUndesiredServer", anyUndesired)
.detail("HealthyTeamsCount", self->healthyTeamCount).detail("IsWrongConfiguration", anyWrongConfiguration);
bool healthy = matchesPolicy && !anyUndesired && team->getServerIDs().size() == self->configuration.storageTeamSize && team->getServerIDs().size() == serversLeft;
team->setHealthy( healthy ); // Unhealthy teams won't be chosen by bestTeam
team->setWrongConfiguration( anyWrongConfiguration );
bool optimal = team->isOptimal() && healthy;
@ -1744,11 +1838,19 @@ ACTOR Future<Void> storageServerTracker(
}
when( std::pair<StorageServerInterface, ProcessClass> newInterface = wait( interfaceChanged ) ) {
bool restartRecruiting = newInterface.first.waitFailure.getEndpoint().address != server->lastKnownInterface.waitFailure.getEndpoint().address;
bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality;
TraceEvent("StorageServerInterfaceChanged", masterId).detail("ServerID", server->id)
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token);
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged);
server->lastKnownInterface = newInterface.first;
server->lastKnownClass = newInterface.second;
if(localityChanged) {
server->inDesiredDC = (self->includedDCs.empty() || std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) != self->includedDCs.end());
self->resetLocalitySet();
}
interfaceChanged = server->onInterfaceChanged;
if(changes.present()) {
changes.get().send( std::make_pair(server->id, server->lastKnownInterface) );
@ -1988,7 +2090,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(
state Future<Void> error = actorCollection( self.addActor.getFuture() );
try {
self.init( *initData );
Void _ = wait( self.init( &self, initData ) );
initData = Reference<InitialDataDistribution>();
self.addActor.send(serverGetTeamRequests(tci, &self));
@ -2328,10 +2430,7 @@ DDTeamCollection* testTeamCollection(int teamSize, IRepPolicyRef policy, int pro
interface.locality.set(LiteralStringRef("machineid"), Standalone<StringRef>(std::to_string(id)));
interface.locality.set(LiteralStringRef("zoneid"), Standalone<StringRef>(std::to_string(id % 5)));
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(id % 3)));
collection->server_info[uid] = Reference<TCServerInfo>(new TCServerInfo(
interface,
ProcessClass()
));
collection->server_info[uid] = Reference<TCServerInfo>(new TCServerInfo(interface, ProcessClass(), true, collection->storageServerSet));
collection->server_status.set(uid, ServerStatus(false, false, interface.locality));
}

View File

@ -343,7 +343,7 @@ struct DDQueueData {
int activeRelocations;
int queuedRelocations;
int bytesWritten;
int64_t bytesWritten;
int teamSize;
std::map<UID, Busyness> busymap;

View File

@ -78,7 +78,7 @@ protected:
extern IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false );
extern IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit );
extern IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent );
extern IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery );
inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string const& filename, UID logID, int64_t memoryLimit, bool checkChecksums=false, bool checkIntegrity=false ) {
switch( storeType ) {
@ -97,4 +97,4 @@ inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string con
Future<Void> GenerateIOLogChecksumFile(std::string const & filename);
Future<Void> KVFileCheck(std::string const & filename, bool const &integrity);
#endif
#endif

View File

@ -56,7 +56,7 @@ extern bool noUnseed;
class KeyValueStoreMemory : public IKeyValueStore, NonCopyable {
public:
KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent );
KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery );
// IClosable
virtual Future<Void> getError() { return log->getError(); }
@ -427,7 +427,7 @@ private:
return log->push( LiteralStringRef("\x01") ); // Changes here should be reflected in OP_DISK_OVERHEAD
}
ACTOR static Future<Void> recover( KeyValueStoreMemory* self ) {
ACTOR static Future<Void> recover( KeyValueStoreMemory* self, bool exactRecovery ) {
// 'uncommitted' variables track something that might be rolled back by an OpRollback, and are copied into permanent variables
// (in self) in OpCommit. OpRollback does the reverse (copying the permanent versions over the uncommitted versions)
// the uncommitted and committed variables should be equal initially (to whatever makes sense if there are no committed transactions recovered)
@ -559,6 +559,11 @@ private:
}
if (zeroFillSize) {
if( exactRecovery ) {
TraceEvent(SevError, "KVSMemExpectedExact", self->id);
ASSERT(false);
}
TEST( true ); // Fixing a partial commit at the end of the KeyValueStoreMemory log
for(int i=0; i<zeroFillSize; i++)
self->log->push( StringRef((const uint8_t*)"",1) );
@ -704,11 +709,11 @@ private:
}
};
KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent )
KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery )
: log(log), id(id), previousSnapshotEnd(-1), currentSnapshotEnd(-1), resetSnapshot(false), memoryLimit(memoryLimit), committedWriteBytes(0),
committedDataSize(0), transactionSize(0), transactionIsLarge(false), disableSnapshot(disableSnapshot), replaceContent(replaceContent), snapshotCount(0), firstCommitWithSnapshot(true)
{
recovering = recover( this );
recovering = recover( this, exactRecovery );
snapshotting = snapshot( this );
commitActors = actorCollection( addActor.getFuture() );
}
@ -716,9 +721,9 @@ KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memor
IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit ) {
TraceEvent("KVSMemOpening", logID).detail("Basename", basename).detail("MemoryLimit", memoryLimit);
IDiskQueue *log = openDiskQueue( basename, logID );
return new KeyValueStoreMemory( log, logID, memoryLimit, false, false );
return new KeyValueStoreMemory( log, logID, memoryLimit, false, false, false );
}
IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent ) {
return new KeyValueStoreMemory( queue, logID, memoryLimit, disableSnapshot, replaceContent );
IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery ) {
return new KeyValueStoreMemory( queue, logID, memoryLimit, disableSnapshot, replaceContent, exactRecovery );
}

View File

@ -305,6 +305,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MOVE_KEYS_KRM_LIMIT, 2000 ); if( randomize && BUGGIFY ) MOVE_KEYS_KRM_LIMIT = 2;
init( MOVE_KEYS_KRM_LIMIT_BYTES, 1e5 ); if( randomize && BUGGIFY ) MOVE_KEYS_KRM_LIMIT_BYTES = 5e4; //This must be sufficiently larger than CLIENT_KNOBS->KEY_SIZE_LIMIT (fdbclient/Knobs.h) to ensure that at least two entries will be returned from an attempt to read a key range map
init( MAX_SKIP_TAGS, 1 ); //The TLogs require tags to be densely packed to be memory efficient, so be careful increasing this knob
init( MAX_ADDED_SOURCES_MULTIPLIER, 2.0 );
//FdbServer
bool longReboots = randomize && BUGGIFY;

View File

@ -247,6 +247,7 @@ public:
int MOVE_KEYS_KRM_LIMIT;
int MOVE_KEYS_KRM_LIMIT_BYTES; //This must be sufficiently larger than CLIENT_KNOBS->KEY_SIZE_LIMIT (fdbclient/Knobs.h) to ensure that at least two entries will be returned from an attempt to read a key range map
int MAX_SKIP_TAGS;
double MAX_ADDED_SOURCES_MULTIPLIER;
//FdbServer
double MIN_REBOOT_TIME;

View File

@ -206,6 +206,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
Void _ = wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
commitMessages(self, ver, messages);
self->version.set( ver );
Void _ = wait(yield(TaskTLogCommit));
//TraceEvent("LogRouterVersion").detail("Ver",ver);
}
lastVer = ver;
@ -217,6 +218,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
if(ver > self->version.get()) {
Void _ = wait(self->minPopped.whenAtLeast(std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
self->version.set( ver );
Void _ = wait(yield(TaskTLogCommit));
}
break;
}

View File

@ -1232,7 +1232,7 @@ ACTOR Future<Void> masterProxyServerCore(
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, false);
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true);
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
// ((SERVER_MEM_LIMIT * COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL) / COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR) is only a approximate formula for limiting the memory used.
// COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR is an estimate based on experiments and not an accurate one.

View File

@ -118,16 +118,62 @@ Future<Void> removeOldDestinations(Transaction *tr, UID oldDest, VectorRef<KeyRa
return waitForAll(actors);
}
ACTOR Future<vector<vector<Optional<UID>>>> findReadWriteDestinations(Standalone<RangeResultRef> shards, UID relocationIntervalId, Transaction* tr) {
ACTOR Future<vector<UID>> addReadWriteDestinations(KeyRangeRef shard, vector<StorageServerInterface> srcInterfs, vector<StorageServerInterface> destInterfs, Version version, int desiredHealthy, int maxServers) {
if(srcInterfs.size() >= maxServers) {
return vector<UID>();
}
state vector< Future<Optional<UID>> > srcChecks;
for(int s=0; s<srcInterfs.size(); s++) {
srcChecks.push_back( checkReadWrite( srcInterfs[s].getShardState.getReplyUnlessFailedFor( GetShardStateRequest( shard, GetShardStateRequest::NO_WAIT), SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, 0, TaskMoveKeys ), srcInterfs[s].id(), 0 ) );
}
state vector< Future<Optional<UID>> > destChecks;
for(int s=0; s<destInterfs.size(); s++) {
destChecks.push_back( checkReadWrite( destInterfs[s].getShardState.getReplyUnlessFailedFor( GetShardStateRequest( shard, GetShardStateRequest::NO_WAIT), SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, 0, TaskMoveKeys ), destInterfs[s].id(), version ) );
}
Void _ = wait( waitForAll(srcChecks) && waitForAll(destChecks) );
int healthySrcs = 0;
for(auto it : srcChecks) {
if( it.get().present() ) {
healthySrcs++;
}
}
vector<UID> result;
int totalDesired = std::min<int>(desiredHealthy - healthySrcs, maxServers - srcInterfs.size());
for(int s = 0; s < destInterfs.size() && result.size() < totalDesired; s++) {
if(destChecks[s].get().present()) {
result.push_back(destChecks[s].get().get());
}
}
return result;
}
ACTOR Future<vector<vector<UID>>> additionalSources(Standalone<RangeResultRef> shards, Transaction* tr, int desiredHealthy, int maxServers) {
vector<Future<Optional<Value>>> serverListEntries;
std::set<UID> fetching;
for(int i = 0; i < shards.size() - 1; ++i) {
vector<UID> src;
vector<UID> dest;
decodeKeyServersValue( shards[i].value, src, dest );
for(int s=0; s<src.size(); s++) {
if(!fetching.count(src[s])) {
fetching.insert(src[s]);
serverListEntries.push_back( tr->get( serverListKeyFor(src[s]) ) );
}
}
for(int s=0; s<dest.size(); s++) {
serverListEntries.push_back( tr->get( serverListKeyFor(dest[s]) ) );
if(!fetching.count(dest[s])) {
fetching.insert(dest[s]);
serverListEntries.push_back( tr->get( serverListKeyFor(dest[s]) ) );
}
}
}
@ -140,29 +186,31 @@ ACTOR Future<vector<vector<Optional<UID>>>> findReadWriteDestinations(Standalone
ssiMap[ssi.id()] = ssi;
}
vector<Future<vector<Optional<UID>>>> allChecks;
vector<Future<vector<UID>>> allChecks;
for(int i = 0; i < shards.size() - 1; ++i) {
KeyRangeRef rangeIntersectKeys( shards[i].key, shards[i+1].key );
vector<UID> src;
vector<UID> dest;
vector<StorageServerInterface> storageServerInterfaces;
vector<StorageServerInterface> srcInterfs;
vector<StorageServerInterface> destInterfs;
decodeKeyServersValue( shards[i].value, src, dest );
for(int s=0; s<dest.size(); s++)
storageServerInterfaces.push_back( ssiMap[dest[s]] );
vector< Future<Optional<UID>> > checks;
for(int s=0; s<storageServerInterfaces.size(); s++) {
checks.push_back( checkReadWrite( storageServerInterfaces[s].getShardState.getReplyUnlessFailedFor(
GetShardStateRequest( rangeIntersectKeys, GetShardStateRequest::NO_WAIT), SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, 0, TaskMoveKeys ), dest[s], tr->getReadVersion().get() ) );
for(int s=0; s<src.size(); s++) {
srcInterfs.push_back( ssiMap[src[s]] );
}
allChecks.push_back(getAll(checks));
for(int s=0; s<dest.size(); s++) {
if( std::find(src.begin(), src.end(), dest[s]) == dest.end() ) {
destInterfs.push_back( ssiMap[dest[s]] );
}
}
allChecks.push_back(addReadWriteDestinations(rangeIntersectKeys, srcInterfs, destInterfs, tr->getReadVersion().get(), desiredHealthy, maxServers));
}
vector<vector<Optional<UID>>> readWriteDestinations = wait(getAll(allChecks));
return readWriteDestinations;
vector<vector<UID>> result = wait(getAll(allChecks));
return result;
}
// Set keyServers[keys].dest = servers
@ -238,7 +286,7 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
// printf("'%s': '%s'\n", old[i].key.toString().c_str(), old[i].value.toString().c_str());
//Check that enough servers for each shard are in the correct state
vector<vector<Optional<UID>>> readWriteDestinations = wait(findReadWriteDestinations(old, relocationIntervalId, &tr));
vector<vector<UID>> addAsSource = wait(additionalSources(old, &tr, servers.size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER*servers.size()));
// For each intersecting range, update keyServers[range] dest to be servers and clear existing dest servers from serverKeys
for(int i = 0; i < old.size() - 1; ++i) {
@ -254,10 +302,8 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
.detail("OldDest", describe(dest))
.detail("ReadVersion", tr.getReadVersion().get());*/
for(auto& uid : readWriteDestinations[i]) {
if(uid.present()) {
src.push_back(uid.get());
}
for(auto& uid : addAsSource[i]) {
src.push_back(uid);
}
uniquify(src);

View File

@ -1297,16 +1297,23 @@ static int getExtraTLogEligibleMachines(vector<std::pair<WorkerInterface, Proces
if(configuration.regions.size() == 0) {
return allMachines.size() - std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize);
}
int extraTlogEligibleMachines = std::numeric_limits<int>::max();
int extraTlogEligibleMachines = configuration.usableRegions == 1 ? 0 : std::numeric_limits<int>::max();
for(auto& region : configuration.regions) {
extraTlogEligibleMachines = std::min<int>( extraTlogEligibleMachines, dcId_machine[region.dcId].size() - std::max(configuration.remoteTLogReplicationFactor, std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize) ) );
int eligible = dcId_machine[region.dcId].size() - std::max(configuration.remoteTLogReplicationFactor, std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize) );
//FIXME: does not take into account fallback satellite policies
if(region.satelliteTLogReplicationFactor > 0) {
int totalSatelliteEligible = 0;
for(auto& sat : region.satellites) {
totalSatelliteEligible += dcId_machine[sat.dcId].size();
}
extraTlogEligibleMachines = std::min<int>( extraTlogEligibleMachines, totalSatelliteEligible - region.satelliteTLogReplicationFactor );
eligible = std::min<int>( eligible, totalSatelliteEligible - region.satelliteTLogReplicationFactor );
}
if( configuration.usableRegions == 1 ) {
if( region.priority >= 0 ) {
extraTlogEligibleMachines = std::max( extraTlogEligibleMachines, eligible );
}
} else {
extraTlogEligibleMachines = std::min( extraTlogEligibleMachines, eligible );
}
}
return extraTlogEligibleMachines;

View File

@ -1477,14 +1477,14 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
Void _ = wait( delayJittered(.005, TaskTLogCommit) );
}
if(logData->stopped) {
return Void();
}
state Version ver = 0;
state std::vector<TagsAndMessage> messages;
loop {
if(logData->stopped) {
return Void();
}
Version ver = 0;
std::vector<TagsAndMessage> messages;
while (true) {
bool foundMessage = r->hasMessage();
state bool foundMessage = r->hasMessage();
if (!foundMessage || r->version().version != ver) {
ASSERT(r->version().version > lastVer);
if (ver) {
@ -1519,6 +1519,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
//FIXME: could we just use the ver and lastVer variables, or replace them with this?
self->prevVersion = logData->version.get();
logData->version.set( ver );
Void _ = wait( yield(TaskTLogCommit) );
}
lastVer = ver;
ver = r->version().version;
@ -1556,6 +1557,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
//FIXME: could we just use the ver and lastVer variables, or replace them with this?
self->prevVersion = logData->version.get();
logData->version.set( ver );
Void _ = wait( yield(TaskTLogCommit) );
}
break;
}

View File

@ -596,7 +596,7 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
// Recover transaction state store
if(self->txnStateStore) self->txnStateStore->close();
self->txnStateLogAdapter = openDiskQueueAdapter( oldLogSystem, txsTag );
self->txnStateStore = keyValueStoreLogSystem( self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false, false );
self->txnStateStore = keyValueStoreLogSystem( self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false, false, true );
// Versionstamped operations (particularly those applied from DR) define a minimum commit version
// that we may recover to, as they embed the version in user-readable data and require that no

View File

@ -408,6 +408,7 @@ public:
Counter mutations, setMutations, clearRangeMutations, atomicMutations;
Counter updateBatches, updateVersions;
Counter loops;
Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount;
Counters(StorageServer* self)
: cc("StorageServer", self->thisServerID.toString()),
@ -429,7 +430,11 @@ public:
atomicMutations("AtomicMutations", cc),
updateBatches("UpdateBatches", cc),
updateVersions("UpdateVersions", cc),
loops("Loops", cc)
loops("Loops", cc),
fetchWaitingMS("FetchWaitingMS", cc),
fetchWaitingCount("FetchWaitingCount", cc),
fetchExecutingMS("FetchExecutingMS", cc),
fetchExecutingCount("FetchExecutingCount", cc)
{
specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; });
specialCounter(cc, "Version", [self](){ return self->version.get(); });
@ -1814,6 +1819,10 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
Void _ = wait( data->fetchKeysParallelismLock.take( TaskDefaultYield, fetchBlockBytes ) );
state FlowLock::Releaser holdingFKPL( data->fetchKeysParallelismLock, fetchBlockBytes );
state double executeStart = now();
++data->counters.fetchWaitingCount;
data->counters.fetchWaitingMS += 1000*(executeStart - startt);
Void _ = wait(delay(0));
shard->phase = AddingShard::Fetching;
@ -2005,6 +2014,9 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
validate(data);
++data->counters.fetchExecutingCount;
data->counters.fetchExecutingMS += 1000*(now() - executeStart);
TraceEvent(SevDebug, interval.end(), data->thisServerID);
} catch (Error &e){
TraceEvent(SevDebug, interval.end(), data->thisServerID).error(e, true).detail("Version", data->version.get());

View File

@ -985,7 +985,7 @@ struct ConsistencyCheckWorkload : TestWorkload
}
if(bytesReadInRange > 0) {
TraceEvent("ConsistencyCheck_ReadRange").detail("Range", printable(range)).detail("BytesRead", bytesReadInRange);
TraceEvent("ConsistencyCheck_ReadRange").suppressFor(1.0).detail("Range", printable(range)).detail("BytesRead", bytesReadInRange);
}
}

View File

@ -24,6 +24,8 @@
#include "workloads.h"
#include "fdbclient/StatusClient.h"
#include "flow/UnitTest.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/ManagementAPI.h"
extern bool noUnseed;
@ -31,7 +33,7 @@ struct StatusWorkload : TestWorkload {
double testDuration, requestsPerSecond;
PerfIntCounter requests, replies, errors, totalSize;
Optional<StatusObject> statusSchema;
Optional<StatusObject> parsedSchema;
StatusWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx),
@ -39,14 +41,14 @@ struct StatusWorkload : TestWorkload {
{
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
requestsPerSecond = getOption(options, LiteralStringRef("requestsPerSecond"), 0.5);
auto statusSchemaStr = getOption(options, LiteralStringRef("schema"), StringRef());
auto statusSchemaStr = getOption(options, LiteralStringRef("schema"), JSONSchemas::statusSchema);
if (statusSchemaStr.size()) {
json_spirit::mValue schema;
json_spirit::read_string( statusSchemaStr.toString(), schema );
statusSchema = schema.get_obj();
parsedSchema = schema.get_obj();
// This is sort of a hack, but generate code coverage *requirements* for everything in schema
schemaCoverageRequirements(statusSchema.get());
schemaCoverageRequirements(parsedSchema.get());
}
noUnseed = true;
@ -80,30 +82,19 @@ struct StatusWorkload : TestWorkload {
m.push_back(errors.getMetric());
}
template <bool Covered=true>
static void schemaCoverage( std::string const& spath ) {
static std::set<std::string> coveredSchemaPaths;
if (coveredSchemaPaths.insert(spath).second) {
TraceEvent ev(SevInfo, "CodeCoverage");
ev.detail("File", "documentation/StatusSchema.json/" + spath).detail("Line", 0);
if (!Covered)
ev.detail("Covered", 0);
}
}
static void schemaCoverageRequirements( StatusObject const& schema, std::string schema_path = std::string() ) {
try {
for(auto& skv : schema) {
std::string spath = schema_path + "." + skv.first;
schemaCoverage<false>(spath);
schemaCoverage(spath, false);
if (skv.second.type() == json_spirit::array_type && skv.second.get_array().size()) {
schemaCoverageRequirements( skv.second.get_array()[0].get_obj(), spath + "[0]" );
} else if (skv.second.type() == json_spirit::obj_type) {
if (skv.second.get_obj().count("$enum")) {
for(auto& enum_item : skv.second.get_obj().at("$enum").get_array())
schemaCoverage<false>(spath + ".$enum." + enum_item.get_str());
schemaCoverage(spath + ".$enum." + enum_item.get_str(), false);
} else
schemaCoverageRequirements( skv.second.get_obj(), spath );
}
@ -117,125 +108,6 @@ struct StatusWorkload : TestWorkload {
}
}
static json_spirit::Value_type normJSONType(json_spirit::Value_type type) {
if (type == json_spirit::int_type)
return json_spirit::real_type;
return type;
}
static bool schemaMatch( StatusObject const schema, StatusObject const result, Severity sev=SevError, std::string path = std::string(), std::string schema_path = std::string() ) {
// Returns true if everything in `result` is permitted by `schema`
// Really this should recurse on "values" rather than "objects"?
bool ok = true;
try {
for(auto& rkv : result) {
auto& key = rkv.first;
auto& rv = rkv.second;
std::string kpath = path + "." + key;
std::string spath = schema_path + "." + key;
schemaCoverage(spath);
if (!schema.count(key)) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaPath", spath);
ok = false;
continue;
}
auto& sv = schema.at(key);
if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$enum")) {
auto& enum_values = sv.get_obj().at("$enum").get_array();
bool any_match = false;
for(auto& enum_item : enum_values)
if (enum_item == rv) {
any_match = true;
schemaCoverage(spath + ".$enum." + enum_item.get_str());
break;
}
if (!any_match) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaEnumItems", enum_values.size()).detail("Value", json_spirit::write_string(rv));
schemaCoverage(spath + ".$enum." + json_spirit::write_string(rv));
ok = false;
}
} else if (sv.type() == json_spirit::obj_type && sv.get_obj().count("$map")) {
if (rv.type() != json_spirit::obj_type) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
ok = false;
continue;
}
if(sv.get_obj().at("$map").type() != json_spirit::obj_type) {
continue;
}
auto& schema_obj = sv.get_obj().at("$map").get_obj();
auto& value_obj = rv.get_obj();
schemaCoverage(spath + ".$map");
for(auto& value_pair : value_obj) {
auto vpath = kpath + "[" + value_pair.first + "]";
auto upath = spath + ".$map";
if (value_pair.second.type() != json_spirit::obj_type) {
TraceEvent(sev, "SchemaMismatch").detail("Path", vpath).detail("ValueType", value_pair.second.type());
ok = false;
continue;
}
if (!schemaMatch(schema_obj, value_pair.second.get_obj(), sev, vpath, upath))
ok = false;
}
} else {
// The schema entry isn't an operator, so it asserts a type and (depending on the type) recursive schema definition
if (normJSONType(sv.type()) != normJSONType(rv.type())) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type());
ok = false;
continue;
}
if (rv.type() == json_spirit::array_type) {
auto& value_array = rv.get_array();
auto& schema_array = sv.get_array();
if (!schema_array.size()) {
// An empty schema array means that the value array is required to be empty
if (value_array.size()) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaSize", schema_array.size()).detail("ValueSize", value_array.size());
ok = false;
continue;
}
} else if (schema_array.size() == 1 && schema_array[0].type() == json_spirit::obj_type) {
// A one item schema array means that all items in the value must match the first item in the schema
auto& schema_obj = schema_array[0].get_obj();
int index = 0;
for(auto &value_item : value_array) {
if (value_item.type() != json_spirit::obj_type) {
TraceEvent(sev, "SchemaMismatch").detail("Path", kpath + format("[%d]",index)).detail("ValueType", value_item.type());
ok = false;
continue;
}
if (!schemaMatch(schema_obj, value_item.get_obj(), sev, kpath + format("[%d]", index), spath + "[0]"))
ok = false;
index++;
}
} else
ASSERT(false); // Schema doesn't make sense
} else if (rv.type() == json_spirit::obj_type) {
auto& schema_obj = sv.get_obj();
auto& value_obj = rv.get_obj();
if (!schemaMatch(schema_obj, value_obj, sev, kpath, spath))
ok = false;
}
}
}
return ok;
} catch (std::exception& e) {
TraceEvent(SevError, "SchemaMatchException").detail("What", e.what()).detail("Path", path).detail("SchemaPath", schema_path);
throw unknown_error();
}
}
ACTOR Future<Void> fetcher(Reference<ClusterConnectionFile> connFile, StatusWorkload *self) {
state double lastTime = now();
@ -251,8 +123,8 @@ struct StatusWorkload : TestWorkload {
save(br, result);
self->totalSize += br.getLength();
TraceEvent("StatusWorkloadReply").detail("ReplySize", br.getLength()).detail("Latency", now() - issued);//.detail("Reply", json_spirit::write_string(json_spirit::mValue(result)));
if (self->statusSchema.present() && !schemaMatch(self->statusSchema.get(), result) )
std::string errorStr;
if (self->parsedSchema.present() && !schemaMatch(self->parsedSchema.get(), result, errorStr, SevError, true) )
TraceEvent(SevError, "StatusWorkloadValidationFailed").detail("JSON", json_spirit::write_string(json_spirit::mValue(result)));
}
catch (Error& e) {
@ -277,7 +149,8 @@ TEST_CASE("fdbserver/status/schema/basic") {
json_spirit::mValue test;
json_spirit::read_string( t, test );
TraceEvent("SchemaMatch").detail("Schema", json_spirit::write_string(schema)).detail("Value", json_spirit::write_string(test)).detail("Expect", expect_ok);
ASSERT( expect_ok == StatusWorkload::schemaMatch(schema.get_obj(), test.get_obj(), expect_ok ? SevError : SevInfo) );
std::string errorStr;
ASSERT( expect_ok == schemaMatch(schema.get_obj(), test.get_obj(), errorStr, expect_ok ? SevError : SevInfo, true) );
};
check(true, "{}");
check(true, "{\"apple\":4}");
@ -293,4 +166,4 @@ TEST_CASE("fdbserver/status/schema/basic") {
check(false, "{\"mapped\":{\"item1\":{\"x\":false},\"item2\":{\"y\":1}}}");
return Void();
}
}

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{87E8D968-B3DE-4C62-82E8-4429423D159C}'
Id='{90C29CFB-37C1-42F3-80B2-A1560145DA10}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,7 +1,7 @@
<?xml version="1.0"?>
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Version>6.0.9</Version>
<Version>6.0.10</Version>
<PackageName>6.0</PackageName>
</PropertyGroup>
</Project>