Merge pull request #905 from etschannen/master

merge release 6.0 into master
This commit is contained in:
Evan Tschannen 2018-11-10 13:09:40 -08:00 committed by GitHub
commit 59a204b287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
126 changed files with 1406 additions and 706 deletions

View File

@ -345,7 +345,7 @@ bool FDBLibTLSSession::verify_peer() {
if (!rc) {
// log the various failure reasons
for (std::string reason : verify_failure_reasons) {
TraceEvent(reason.c_str(), uid);
TraceEvent(reason.c_str(), uid).suppressFor(1.0);
}
}

View File

@ -58,14 +58,17 @@ class Result:
self.key_tuple = subspace.unpack(key)
self.values = values
def matches(self, rhs, specification):
def matches_key(self, rhs, specification):
if not isinstance(rhs, Result):
return False
left_key = self.key_tuple[specification.key_start_index:]
right_key = self.key_tuple[specification.key_start_index:]
right_key = rhs.key_tuple[specification.key_start_index:]
if len(left_key) != len(right_key) or left_key != right_key:
return left_key == right_key
def matches(self, rhs, specification):
if not self.matches_key(rhs, specification):
return False
for value in self.values:

View File

@ -90,7 +90,7 @@ class ResultSet(object):
if any([s is not None for s in sequence_nums]):
results = {i: r for i, r in results.items() if r.sequence_num(self.specification) == min(sequence_nums)}
else:
results = {i: r for i, r in results.items() if r.matches(min(results.values()), self.specification)}
results = {i: r for i, r in results.items() if r.matches_key(min(results.values()), self.specification)}
for i in results.keys():
indices[i] += 1

View File

@ -181,8 +181,8 @@ class InstructionSet(TestInstructions, list):
tr[subspace.pack((start + i,))] = instruction.to_value()
def insert_operations(self, db, subspace):
for i in range(0, int(math.ceil(len(self) / 1000.0))):
self._insert_operations_transactional(db, subspace, i * 1000, 1000)
for i in range(0, int(math.ceil(len(self) / 5000.0))):
self._insert_operations_transactional(db, subspace, i * 5000, 5000)
class ThreadedInstructionSet(TestInstructions):

View File

@ -0,0 +1,91 @@
#
# tuple.py
#
# This source file is part of the FoundationDB open source project
#
# Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import random
import struct
import fdb
import fdb.tuple
from bindingtester import FDB_API_VERSION
from bindingtester import util
from bindingtester.tests import Test, Instruction, InstructionSet, ResultSpecification
from bindingtester.tests import test_util
fdb.api_version(FDB_API_VERSION)
class TupleTest(Test):
def __init__(self, subspace):
super(TupleTest, self).__init__(subspace)
self.workspace = self.subspace['workspace'] # The keys and values here must match between subsequent runs of the same test
self.stack_subspace = self.subspace['stack']
def setup(self, args):
self.max_int_bits = args.max_int_bits
self.api_version = args.api_version
def generate(self, args, thread_number):
instructions = InstructionSet()
min_value = -2**self.max_int_bits+1
max_value = 2**self.max_int_bits-1
instructions.append('NEW_TRANSACTION')
# Test integer encoding
mutations = 0
for i in range(0, self.max_int_bits+1):
for sign in [-1, 1]:
sign_str = '' if sign == 1 else '-'
for offset in range(-10, 11):
val = (2**i) * sign + offset
if val >= min_value and val <= max_value:
if offset == 0:
add_str = ''
elif offset > 0:
add_str = '+%d' % offset
else:
add_str = '%d' % offset
instructions.push_args(1, val)
instructions.append('TUPLE_PACK')
instructions.push_args(self.workspace.pack(('%s2^%d%s' % (sign_str, i, add_str),)))
instructions.append('SET')
mutations += 1
if mutations >= 5000:
test_util.blocking_commit(instructions)
mutations = 0
instructions.begin_finalization()
test_util.blocking_commit(instructions)
instructions.push_args(self.stack_subspace.key())
instructions.append('LOG_STACK')
test_util.blocking_commit(instructions)
return instructions
def get_result_specifications(self):
return [
ResultSpecification(self.workspace, global_error_filter=[1007, 1021]),
ResultSpecification(self.stack_subspace, key_start_index=1, ordering_index=1, global_error_filter=[1007, 1021]),
]

View File

@ -286,6 +286,8 @@ func (ri *RangeIterator) MustGet() KeyValue {
return kv
}
// Strinc returns the first key that would sort outside the range prefixed by
// prefix, or an error if prefix is empty or contains only 0xFF bytes.
func Strinc(prefix []byte) ([]byte, error) {
for i := len(prefix) - 1; i >= 0; i-- {
if prefix[i] != 0xFF {
@ -311,7 +313,7 @@ func PrefixRange(prefix []byte) (KeyRange, error) {
copy(begin, prefix)
end, e := Strinc(begin)
if e != nil {
return KeyRange{}, nil
return KeyRange{}, e
}
return KeyRange{Key(begin), Key(end)}, nil
}

View File

@ -547,12 +547,21 @@ Virtual machines
Processes running in different VMs on a single machine will appear to FoundationDB as being hardware isolated. FoundationDB takes pains to assure that data replication is protected from hardware-correlated failures. If FoundationDB is run in multiple VMs on a single machine this protection will be subverted. An administrator can inform FoundationDB of this hardware sharing, however, by specifying a machine ID using the ``locality_machineid`` parameter in :ref:`foundationdb.conf <foundationdb-conf>`. All processes on VMs that share hardware should specify the same ``locality_machineid``.
Datacenters
------------
-----------
FoundationDB is datacenter aware and supports operation across datacenters. In a multiple-datacenter configuration, it is recommended that you set the :ref:`redundancy mode <configuration-choosing-redundancy-mode>` to ``three_datacenter`` and that you set the ``locality_dcid`` parameter for all FoundationDB processes in :ref:`foundationdb.conf <foundationdb-conf>`.
If you specify the ``--datacenter_id`` option to any FoundationDB process in your cluster, you should specify it to all such processes. Processes which do not have a specified datacenter ID on the command line are considered part of a default "unset" datacenter. FoundationDB will incorrectly believe that these processes are failure-isolated from other datacenters, which can reduce performance and fault tolerance.
(Re)creating a database
-----------------------
Installing FoundationDB packages usually creates a new database on the cluster automatically. However, if a cluster does not have a database configured (because the package installation failed to create it, you deleted your data files, or you did not install from the packages, etc.), then you may need to create it manually using the ``configure new`` command in ``fdbcli`` with the desired redundancy mode and storage engine::
> configure new single memory
.. warning:: In a cluster that hasn't been configured, running ``configure new`` will cause the processes in the cluster to delete all data files in their data directories. If a process is reusing an existing data directory, be sure to backup any files that you want to keep. Do not use ``configure new`` to fix a previously working cluster that reports itself missing unless you are certain any necessary data files are safe.
.. _administration-removing:
Uninstalling

View File

@ -242,7 +242,7 @@
.. |option-tls-plugin-blurb| replace::
Sets the :ref:`TLS plugin <configuring-tls-plugin>` to load. This option, if used, must be set before any other TLS options.
Sets the :ref:`TLS plugin <configuring-tls>` to load. This option, if used, must be set before any other TLS options.
.. |option-tls-cert-path-blurb| replace::

View File

@ -1064,7 +1064,7 @@ the most part, this also implies that ``T == fdb.tuple.unpack(fdb.tuple.pack(T))
.. method:: pack(tuple, prefix=b'')
Returns a key (byte string) encoding the specified tuple. If ``prefix`` is set, it will prefix the serialized
bytes with the prefix string. This throws an error if any of the tuple's items are incomplete `Versionstamp`
bytes with the prefix string. This throws an error if any of the tuple's items are incomplete :class:`Versionstamp`
instances.
.. method:: pack_with_versionstamp(tuple, prefix=b'')
@ -1074,8 +1074,8 @@ the most part, this also implies that ``T == fdb.tuple.unpack(fdb.tuple.pack(T))
recurse down nested tuples if there are any to find one.) If so, it will produce a byte string
that can be fed into :meth:`fdb.Transaction.set_versionstamped_key` and correctly fill in the
versionstamp information at commit time so that when the key is re-read and deserialized, the
only difference is that the `Versionstamp` instance is complete and has the transaction version
filled in. This throws an error if there are no incomplete `Versionstamp` instances in the tuple
only difference is that the :class:`Versionstamp` instance is complete and has the transaction version
filled in. This throws an error if there are no incomplete :class:`Versionstamp` instances in the tuple
or if there is more than one.
.. method:: unpack(key)

View File

@ -870,7 +870,7 @@ All future objects are a subclass of the :class:`Future` type.
|future-cancel-blurb|
.. classmethod:: Future.wait_for_any(*futures) -> Fixnum
.. classmethod:: Future.wait_for_any(\*futures) -> Fixnum
Does not return until at least one of the given future objects is ready. Returns the index in the parameter list of a ready future object.

View File

@ -1,8 +1,8 @@
.. _backups:
######################
######################################################
Backup, Restore, and Replication for Disaster Recovery
######################
######################################################
.. include:: guide-common.rst.inc
@ -323,7 +323,7 @@ Optionally, the user can specify a minimum RESTORABILITY guarantee with one of t
.. program:: fdbbackup describe
``describe``
----------
------------
The ``describe`` subcommand will analyze the given backup and print a summary of the snapshot and mutation data versions it contains as well as the version range of restorability the backup can currently provide.

View File

@ -99,7 +99,7 @@ For large clusters, you can manually set the allocated number of processes of a
Set the process using ``configure [proxies|resolvers|logs]=<N>``, where ``<N>`` is an integer greater than 0, or -1 to reset the value to its default.
For recommendations on appropriate values for process types in large clusters, see :ref:`configuration-large-cluster-performance`.
For recommendations on appropriate values for process types in large clusters, see :ref:`guidelines-process-class-config`.
coordinators
------------

View File

@ -263,7 +263,8 @@ Contains default parameters for all fdbserver processes on this machine. These s
* ``locality_dcid``: Data center identifier key. All processes physically located in a data center should share the id. No default value. If you are depending on data center based replication this must be set on all processes.
* ``locality_data_hall``: Data hall identifier key. All processes physically located in a data hall should share the id. No default value. If you are depending on data hall based replication this must be set on all processes.
* ``io_trust_seconds``: Time in seconds that a read or write operation is allowed to take before timing out with an error. If an operation times out, all future operations on that file will fail with an error as well. Only has an effect when using AsyncFileKAIO in Linux. If unset, defaults to 0 which means timeout is disabled.
.. note:: In addition to the options above, TLS settings as described for the :ref:`TLS plugin <configuring-tls-plugin>` can be specified in the [fdbserver] section.
.. note:: In addition to the options above, TLS settings as described for the :ref:`TLS plugin <configuring-tls>` can be specified in the [fdbserver] section.
``[fdbserver.<ID>]`` section(s)
---------------------------------

2
documentation/sphinx/source/guide-common.rst.inc Normal file → Executable file
View File

@ -31,7 +31,7 @@
``coordinators auto`` selects processes based on IP address. If your cluster has processes on the same machine with different IP addresses, ``coordinators auto`` may select a set of coordinators that are not fault tolerant. To ensure maximal fault tolerance, we recommend selecting coordinators according to the criteria in :ref:`configuration-choosing-coordination-servers` and setting them manually.
.. |conf-file-change-detection| replace::
Whenever the ``foundationdb.conf`` file changes, the ``fdbmonitor`` daemon automatically detects the changes and starts, stops, or restarts child processes as necessary.
Whenever the ``foundationdb.conf`` file changes, the ``fdbmonitor`` daemon automatically detects the changes and starts, stops, or restarts child processes as necessary. Note that changes to the configuration file contents must be made *atomically*. It is recommended to save the modified file to a temporary filename and then move/rename it into place, replacing the original. Some text editors do this automatically when saving.
.. |package-deb-clients| replace::
foundationdb-clients\_\ |release|\ -1\_amd64.deb

View File

@ -14,7 +14,7 @@ Documentation
FoundationDB is a robust choice for a broad range of use cases:
**Developers can store all types of data.** FoundationDB is multi-model, meaning you can store many types data in a single database. All data is safely stored, distributed, and replicated in FoundationDB.
**Developers can store all types of data.** FoundationDB is multi-model, meaning you can store many types of data in a single database. All data is safely stored, distributed, and replicated in FoundationDB.
**Administrators easily scale and handle hardware failures.** FoundationDB is easy to install, grow, and manage. It has a distributed architecture that gracefully scales out and handles faults while acting like a single ACID database.

View File

@ -25,7 +25,7 @@ These limitations come from fundamental design decisions and are unlikely to cha
Large transactions
------------------
Transaction size cannot exceed 10,000,000 bytes of affected data. Keys, values, and ranges that you read or write are all included as affected data. Likewise, conflict ranges that you :ref:`add <api-python-conflict-ranges>` or remove (using a :ref:`snapshot read <api-python-snapshot-reads>` or a :ref:`transaction option <api-python-no-write-conflict-range>`) are also added or removed from the scope of affected data.
Transaction size cannot exceed 10,000,000 bytes of affected data. Keys, values, and ranges that you write are included as affected data. Keys and ranges that you read are also included as affected data, but values that you read are not. Likewise, conflict ranges that you :ref:`add <api-python-conflict-ranges>` or remove (using a :ref:`snapshot read <api-python-snapshot-reads>` or a :ref:`transaction option <api-python-no-write-conflict-range>`) are also added or removed from the scope of affected data.
If any single transaction exceeds one megabyte of affected data, you should modify your design. In the current version, these large transactions can cause performance issues and database availability can (briefly) be impacted.
@ -76,7 +76,7 @@ Anyone who can connect to a FoundationDB cluster can read and write every key in
Current limitations
===================
These limitations do not reflect fundamental aspects of our design and are likely be resolved or mitigated in future versions. Administrators should be aware of these issues, but longer-term application development should be less driven by them.
These limitations do not reflect fundamental aspects of our design and are likely to be resolved or mitigated in future versions. Administrators should be aware of these issues, but longer-term application development should be less driven by them.
.. _long-transactions:

View File

@ -5,7 +5,7 @@ Local Development
Download the FoundationDB package
=================================
:doc:`Download the FoundationDB package <downloads>` for macOS (FoundationDB-*.pkg) onto your local development machine.
:doc:`Download the FoundationDB package <downloads>` for macOS (FoundationDB-\*.pkg) onto your local development machine.
Install the FoundationDB binaries
=================================

View File

@ -10,7 +10,7 @@ Language support
* FoundationDB now supports :doc:`Ruby </api-ruby>`
* FoundationDB now supports :doc:`Node.js </api-node>`
* FoundationDB now supports Node.js
* FoundationDB now supports `Java </javadoc/index.html>`_ and other JVM languages.

View File

@ -223,12 +223,12 @@ Node
----
* Support for API version 200 and backwards compatibility with previous API versions.
* New APIs for allocating and managing keyspace (:ref:`Directory <developer-guide-directories>`).
* Support for the :ref:`Promise/A+ specification <api-node-promises>` with supporting utilities.
* Support for the Promise/A+ specification with supporting utilities.
* Futures can take multiple callbacks. Callbacks can be added if the original function was called with a callback. The Future type is exposed in our binding.
* Added ``as_foundationdb_key`` and ``as_foundationdb_value`` support.
* Node prints a stack trace if an error occurs in a callback from V8.
* Snapshot transactions can be used in retry loops.
* The :ref:`methods <api-node-setAndWatch>` ``db.setAndWatch`` and ``db.clearAndWatch`` now return an object with a watch member instead of a future.
* The methods ``db.setAndWatch`` and ``db.clearAndWatch`` now return an object with a watch member instead of a future.
* Fix: Could not use the ``'this'`` pointer with the retry decorator.
* Fix: Node transactional decorator didn't return a result to the caller if the function was called with a transaction.
* Fix: The program could sometimes crash when watches were manually cancelled.

View File

@ -47,7 +47,7 @@ Fixes
Java
----
* The `ReadTransaction` interface supports the ability to set transaction options.
* The ``ReadTransaction`` interface supports the ability to set transaction options.
Other Changes
-------------

View File

@ -100,7 +100,7 @@ Parameters and client bindings
------------------------------
The default LibreSSL-based implementation
=================================
=========================================
FoundationDB offers TLS based on the LibreSSL library. By default, it will be enabled automatically when participating in a TLS-enabled cluster.
@ -230,7 +230,7 @@ Field Well known name
``subjectAltName`` Subject Alternative Name
================== ========================
Within a subject alternative name requirement, the value specified is required to have the form ``prefix:value``, where the prefix specifies the type of value being matched against. The following prefixes are supported.
Within a subject alternative name requirement, the value specified is required to have the form ``prefix:value``, where the prefix specifies the type of value being matched against. The following prefixes are supported:
====== ===========================
Prefix Well known name
@ -239,7 +239,7 @@ DNS Domain Name
URI Uniform Resource Identifier
IP IP Address
EMAIL Email Address
====== ============================
====== ===========================
The following operators are supported:

View File

@ -30,12 +30,12 @@
#include "fdbclient/Status.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbrpc/Platform.h"
#include "fdbrpc/BlobStore.h"
#include "fdbclient/BlobStore.h"
#include "fdbclient/json_spirit/json_spirit_writer_template.h"
#include "fdbrpc/Platform.h"
#include <stdarg.h>
#include <stdio.h>
#include <algorithm> // std::transform

View File

@ -1495,11 +1495,18 @@ ACTOR Future<Void> commitTransaction( Reference<ReadYourWritesTransaction> tr )
ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Reference<ClusterConnectionFile> ccf, LineNoise* linenoise, Future<Void> warn ) {
state ConfigurationResult::Type result;
state int startToken = 1;
state bool force = false;
if (tokens.size() < 2)
result = ConfigurationResult::NO_OPTIONS_PROVIDED;
else {
if(tokens[startToken] == LiteralStringRef("FORCE")) {
force = true;
startToken = 2;
}
state Optional<ConfigureAutoResult> conf;
if( tokens[1] == LiteralStringRef("auto") ) {
if( tokens[startToken] == LiteralStringRef("auto") ) {
StatusObject s = wait( makeInterruptable(StatusClient::statusFetcher( ccf )) );
if(warn.isValid())
warn.cancel();
@ -1559,7 +1566,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
}
}
ConfigurationResult::Type r = wait( makeInterruptable( changeConfig( db, std::vector<StringRef>(tokens.begin()+1,tokens.end()), conf) ) );
ConfigurationResult::Type r = wait( makeInterruptable( changeConfig( db, std::vector<StringRef>(tokens.begin()+startToken,tokens.end()), conf, force) ) );
result = r;
}
@ -1571,7 +1578,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
case ConfigurationResult::CONFLICTING_OPTIONS:
case ConfigurationResult::UNKNOWN_OPTION:
case ConfigurationResult::INCOMPLETE_CONFIGURATION:
printUsage(tokens[0]);
printUsage(LiteralStringRef("configure"));
ret=true;
break;
case ConfigurationResult::INVALID_CONFIGURATION:
@ -1586,6 +1593,26 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
printf("Database created\n");
ret=false;
break;
case ConfigurationResult::DATABASE_UNAVAILABLE:
printf("ERROR: The database is unavailable\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::STORAGE_IN_UNKNOWN_DCID:
printf("ERROR: All storage servers must be in one of the known regions\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
printf("ERROR: When usable_regions=2, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
printf("ERROR: When changing from usable_regions=1 to usable_regions=2, only one region can have priority >= 0\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::SUCCESS:
printf("Configuration changed\n");
ret=false;
@ -1597,7 +1624,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
return ret;
}
ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDatabase) {
ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDatabase, bool force) {
std::string contents(readFileBytes(filePath, 100000));
json_spirit::mValue config;
if(!json_spirit::read_string( contents, config )) {
@ -1637,7 +1664,7 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
return true;
}
}
ConfigurationResult::Type result = wait( makeInterruptable( changeConfig(db, configString) ) );
ConfigurationResult::Type result = wait( makeInterruptable( changeConfig(db, configString, force) ) );
// Real errors get thrown from makeInterruptable and printed by the catch block in cli(), but
// there are various results specific to changeConfig() that we need to report:
bool ret;
@ -1670,6 +1697,26 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
printf("Database created\n");
ret=false;
break;
case ConfigurationResult::DATABASE_UNAVAILABLE:
printf("ERROR: The database is unavailable\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::STORAGE_IN_UNKNOWN_DCID:
printf("ERROR: All storage servers must be in one of the known regions\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
printf("ERROR: When usable_regions=2, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
printf("ERROR: When changing from usable_regions=1 to usable_regions=2, only one region can have priority >= 0\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::SUCCESS:
printf("Configuration changed\n");
ret=false;
@ -2520,8 +2567,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
if (tokencmp(tokens[0], "fileconfigure")) {
if (tokens.size() == 2 || (tokens.size() == 3 && tokens[1] == LiteralStringRef("new"))) {
bool err = wait( fileConfigure( db, tokens.back().toString(), tokens.size() == 3 ) );
if (tokens.size() == 2 || (tokens.size() == 3 && (tokens[1] == LiteralStringRef("new") || tokens[1] == LiteralStringRef("FORCE")) )) {
bool err = wait( fileConfigure( db, tokens.back().toString(), tokens[1] == LiteralStringRef("new"), tokens[1] == LiteralStringRef("FORCE") ) );
if (err) is_error = true;
} else {
printUsage(tokens[0]);

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/AsyncFileBlobStore.actor.h"
#include "fdbclient/AsyncFileBlobStore.actor.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "flow/UnitTest.h"

View File

@ -34,9 +34,9 @@
#include "flow/serialize.h"
#include "flow/Net2Packet.h"
#include "fdbrpc/IRateControl.h"
#include "fdbrpc/BlobStore.h"
#include "fdbrpc/md5/md5.h"
#include "fdbrpc/libb64/encode.h"
#include "fdbclient/BlobStore.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR template<typename T> static Future<T> joinErrorGroup(Future<T> f, Promise<Void> p) {

View File

@ -22,9 +22,9 @@
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/Hash3.h"
#include "fdbrpc/AsyncFileBlobStore.actor.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "fdbrpc/Platform.h"
#include "fdbclient/AsyncFileBlobStore.actor.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/ReadYourWrites.h"

View File

@ -18,11 +18,11 @@
* limitations under the License.
*/
#include "fdbrpc/BlobStore.h"
#include "fdbclient/BlobStore.h"
#include "fdbrpc/md5/md5.h"
#include "fdbrpc/libb64/encode.h"
#include "fdbrpc/sha1/SHA1.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
#include "fdbclient/sha1/SHA1.h"
#include <time.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>

View File

@ -26,8 +26,8 @@
#include "flow/Net2Packet.h"
#include "fdbclient/Knobs.h"
#include "fdbrpc/IRateControl.h"
#include "fdbrpc/HTTP.h"
#include "fdbrpc/JSONDoc.h"
#include "fdbclient/HTTP.h"
#include "fdbclient/JSONDoc.h"
// Representation of all the things you need to connect to a blob store instance with some credentials.
// Reference counted because a very large number of them could be needed.

View File

@ -54,8 +54,11 @@ void parseReplicationPolicy(IRepPolicyRef* policy, ValueRef const& v) {
void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
try {
StatusObject statusObj = BinaryReader::fromStringRef<StatusObject>(v, IncludeVersion());
StatusArray regionArray = statusObj["regions"].get_array();
regions->clear();
if(statusObj["regions"].type() != json_spirit::array_type) {
return;
}
StatusArray regionArray = statusObj["regions"].get_array();
for (StatusObjectReader dc : regionArray) {
RegionInfo info;
json_spirit::mArray datacenters;

View File

@ -32,22 +32,19 @@
#include "fdbclient/EventTypes.actor.h"
#include "fdbrpc/ContinuousSample.h"
class LocationInfo : public MultiInterface<StorageServerInterface> {
class StorageServerInfo : public ReferencedInterface<StorageServerInterface> {
public:
static Reference<LocationInfo> getInterface( DatabaseContext *cx, std::vector<StorageServerInterface> const& alternatives, LocalityData const& clientLocality );
static Reference<StorageServerInfo> getInterface( DatabaseContext *cx, StorageServerInterface const& interf, LocalityData const& locality );
void notifyContextDestroyed();
virtual ~LocationInfo();
virtual ~StorageServerInfo();
private:
DatabaseContext *cx;
LocationInfo( DatabaseContext* cx, vector<StorageServerInterface> const& shards, LocalityData const& clientLocality ) : cx(cx), MultiInterface( shards, clientLocality ) {}
StorageServerInfo( DatabaseContext *cx, StorageServerInterface const& interf, LocalityData const& locality ) : cx(cx), ReferencedInterface<StorageServerInterface>(interf, locality) {}
};
class ProxyInfo : public MultiInterface<MasterProxyInterface> {
public:
ProxyInfo( vector<MasterProxyInterface> const& proxies, LocalityData const& clientLocality ) : MultiInterface( proxies, clientLocality, ALWAYS_FRESH ) {}
};
typedef MultiInterface<ReferencedInterface<StorageServerInterface>> LocationInfo;
typedef MultiInterface<MasterProxyInterface> ProxyInfo;
class DatabaseContext : public ReferenceCounted<DatabaseContext>, NonCopyable {
public:
@ -122,7 +119,7 @@ public:
int locationCacheSize;
CoalescedKeyRangeMap< Reference<LocationInfo> > locationCache;
std::map< std::vector<UID>, LocationInfo* > ssid_locationInfo;
std::map< UID, StorageServerInfo* > server_interf;
Standalone<StringRef> dbId;

View File

@ -91,6 +91,11 @@ static std::string describe( const int item ) {
return format("%d", item);
}
template <class T>
static std::string describe( Reference<T> const& item ) {
return item->toString();
}
template <class T>
static std::string describe( T const& item ) {
return item.toString();

View File

@ -18,11 +18,11 @@
* limitations under the License.
*/
#include "fdbrpc/HTTP.h"
#include "fdbrpc/md5/md5.h"
#include "fdbrpc/libb64/encode.h"
#include "fdbclient/HTTP.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
#include <cctype>
#include "fdbrpc/xml2json.hpp"
#include "fdbclient/xml2json.hpp"
namespace HTTP {

2
fdbclient/JsonBuilder.h Executable file → Normal file
View File

@ -5,7 +5,7 @@
#include <cmath>
#include "flow/flow.h"
#include "flow/Trace.h"
#include "fdbrpc/JSONDoc.h"
#include "fdbclient/JSONDoc.h"
class JsonBuilder;
class JsonBuilderObject;

View File

@ -51,6 +51,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( DEFAULT_MAX_BACKOFF, 1.0 );
init( BACKOFF_GROWTH_RATE, 2.0 );
init( RESOURCE_CONSTRAINED_MAX_BACKOFF, 30.0 );
init( PROXY_COMMIT_OVERHEAD_BYTES, 23 ); //The size of serializing 7 tags (3 primary, 3 remote, 1 log router) + 2 for the tag length
init( TRANSACTION_SIZE_LIMIT, 1e7 );
init( KEY_SIZE_LIMIT, 1e4 );
@ -61,7 +62,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MAX_BATCH_SIZE, 20 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1; // Note that SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE is set to match this value
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;
init( LOCATION_CACHE_EVICTION_SIZE, 100000 );
init( LOCATION_CACHE_EVICTION_SIZE, 300000 );
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;
init( GET_RANGE_SHARD_LIMIT, 2 );

View File

@ -49,6 +49,7 @@ public:
double DEFAULT_MAX_BACKOFF;
double BACKOFF_GROWTH_RATE;
double RESOURCE_CONSTRAINED_MAX_BACKOFF;
int PROXY_COMMIT_OVERHEAD_BYTES;
int64_t TRANSACTION_SIZE_LIMIT;
int64_t KEY_SIZE_LIMIT;

View File

@ -268,39 +268,108 @@ ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration( Database cx ) {
}
}
ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std::string, std::string> m ) {
ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std::string, std::string> m, bool force ) {
state StringRef initIdKey = LiteralStringRef( "\xff/init_id" );
state Transaction tr(cx);
if (!m.size())
if (!m.size()) {
return ConfigurationResult::NO_OPTIONS_PROVIDED;
}
// make sure we have essential configuration options
std::string initKey = configKeysPrefix.toString() + "initialized";
state bool creating = m.count( initKey ) != 0;
if (creating) {
m[initIdKey.toString()] = g_random->randomUniqueID().toString();
if (!isCompleteConfiguration(m))
if (!isCompleteConfiguration(m)) {
return ConfigurationResult::INCOMPLETE_CONFIGURATION;
} else {
state Future<DatabaseConfiguration> fConfig = getDatabaseConfiguration(cx);
wait( success(fConfig) || delay(1.0) );
if(fConfig.isReady()) {
DatabaseConfiguration config = fConfig.get();
for(auto kv : m) {
config.set(kv.first, kv.second);
}
if(!config.isValid()) {
return ConfigurationResult::INVALID_CONFIGURATION;
}
}
}
state Future<Void> tooLong = delay(4.5);
loop {
try {
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
if(!creating && !force) {
state Future<Standalone<RangeResultRef>> fConfig = tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY);
wait( success(fConfig) || tooLong );
if(!fConfig.isReady()) {
return ConfigurationResult::DATABASE_UNAVAILABLE;
}
if(fConfig.isReady()) {
ASSERT( fConfig.get().size() < CLIENT_KNOBS->TOO_MANY );
state DatabaseConfiguration oldConfig;
oldConfig.fromKeyValues((VectorRef<KeyValueRef>) fConfig.get());
state DatabaseConfiguration newConfig = oldConfig;
for(auto kv : m) {
newConfig.set(kv.first, kv.second);
}
if(!newConfig.isValid()) {
return ConfigurationResult::INVALID_CONFIGURATION;
}
if(oldConfig.usableRegions==1 && newConfig.usableRegions==2) {
//must only have one region with priority >= 0
int activeRegionCount = 0;
for(auto& it : newConfig.regions) {
if(it.priority >= 0) {
activeRegionCount++;
}
}
if(activeRegionCount > 1) {
return ConfigurationResult::MULTIPLE_ACTIVE_REGIONS;
}
}
state Future<Standalone<RangeResultRef>> fServerList = (newConfig.regions.size()) ? tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) : Future<Standalone<RangeResultRef>>();
if(newConfig.usableRegions==2) {
//all regions with priority >= 0 must be fully replicated
state std::vector<Future<Optional<Value>>> replicasFutures;
for(auto& it : newConfig.regions) {
if(it.priority >= 0) {
replicasFutures.push_back(tr.get(datacenterReplicasKeyFor(it.dcId)));
}
}
wait( waitForAll(replicasFutures) || tooLong );
for(auto& it : replicasFutures) {
if(!it.isReady()) {
return ConfigurationResult::DATABASE_UNAVAILABLE;
}
if(!it.get().present()) {
return ConfigurationResult::REGION_NOT_FULLY_REPLICATED;
}
}
}
if(newConfig.regions.size()) {
//all storage servers must be in one of the regions
wait( success(fServerList) || tooLong );
if(!fServerList.isReady()) {
return ConfigurationResult::DATABASE_UNAVAILABLE;
}
Standalone<RangeResultRef> serverList = fServerList.get();
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
std::set<Key> newDcIds;
for(auto& it : newConfig.regions) {
newDcIds.insert(it.dcId);
}
for(auto& s : serverList) {
auto ssi = decodeServerListValue( s.value );
if ( !ssi.locality.dcId().present() || !newDcIds.count(ssi.locality.dcId().get()) ) {
return ConfigurationResult::STORAGE_IN_UNKNOWN_DCID;
}
}
}
}
}
if (creating) {
tr.setOption( FDBTransactionOptions::INITIALIZE_NEW_DATABASE );
tr.addReadConflictRange( singleKeyRange( initIdKey ) );
@ -636,7 +705,7 @@ ACTOR Future<ConfigurationResult::Type> autoConfig( Database cx, ConfigureAutoRe
}
}
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<StringRef> const& modes, Optional<ConfigureAutoResult> const& conf ) {
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<StringRef> const& modes, Optional<ConfigureAutoResult> const& conf, bool force ) {
if( modes.size() && modes[0] == LiteralStringRef("auto") && conf.present() ) {
return autoConfig(cx, conf.get());
}
@ -645,16 +714,16 @@ Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<
auto r = buildConfiguration( modes, m );
if (r != ConfigurationResult::SUCCESS)
return r;
return changeConfig(cx, m);
return changeConfig(cx, m, force);
}
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::string const& modes ) {
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::string const& modes, bool force ) {
TraceEvent("ChangeConfig").detail("Mode", modes);
std::map<std::string,std::string> m;
auto r = buildConfiguration( modes, m );
if (r != ConfigurationResult::SUCCESS)
return r;
return changeConfig(cx, m);
return changeConfig(cx, m, force);
}
ACTOR Future<vector<ProcessData>> getWorkers( Transaction* tr ) {

View File

@ -49,6 +49,10 @@ public:
INVALID_CONFIGURATION,
DATABASE_ALREADY_CREATED,
DATABASE_CREATED,
DATABASE_UNAVAILABLE,
STORAGE_IN_UNKNOWN_DCID,
REGION_NOT_FULLY_REPLICATED,
MULTIPLE_ACTIVE_REGIONS,
SUCCESS
};
};
@ -104,11 +108,11 @@ ConfigurationResult::Type buildConfiguration( std::string const& modeString, std
bool isCompleteConfiguration( std::map<std::string, std::string> const& options );
// All versions of changeConfig apply the given set of configuration tokens to the database, and return a ConfigurationResult (or error).
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::string const& configMode ); // Accepts tokens separated by spaces in a single string
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::string const& configMode, bool force ); // Accepts tokens separated by spaces in a single string
ConfigureAutoResult parseConfig( StatusObject const& status );
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<StringRef> const& modes, Optional<ConfigureAutoResult> const& conf ); // Accepts a vector of configuration tokens
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::map<std::string, std::string> const& m ); // Accepts a full configuration in key/value format (from buildConfiguration)
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<StringRef> const& modes, Optional<ConfigureAutoResult> const& conf, bool force ); // Accepts a vector of configuration tokens
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::map<std::string, std::string> const& m, bool const& force ); // Accepts a full configuration in key/value format (from buildConfiguration)
Future<DatabaseConfiguration> getDatabaseConfiguration( Database const& cx );
Future<Void> waitForFullReplication( Database const& cx );

View File

@ -28,6 +28,7 @@
struct MasterProxyInterface {
enum { LocationAwareLoadBalance = 1 };
enum { AlwaysFresh = 1 };
LocalityData locality;
RequestStream< struct CommitTransactionRequest > commit;
@ -96,12 +97,12 @@ struct CommitTransactionRequest {
}
};
static inline int getBytes( CommitTransactionRequest const& r ) {
static inline int getBytes( CommitTransactionRequest const& r ) {
// SOMEDAY: Optimize
//return r.arena.getSize(); // NOT correct because arena can be shared!
int total = sizeof(r);
for(auto m = r.transaction.mutations.begin(); m != r.transaction.mutations.end(); ++m)
total += m->expectedSize();
total += m->expectedSize() + CLIENT_KNOBS->PROXY_COMMIT_OVERHEAD_BYTES;
for(auto i = r.transaction.read_conflict_ranges.begin(); i != r.transaction.read_conflict_ranges.end(); ++i)
total += i->expectedSize();
for(auto i = r.transaction.write_conflict_ranges.begin(); i != r.transaction.write_conflict_ranges.end(); ++i)

View File

@ -73,38 +73,40 @@ static void initTLSOptions() {
static const Key CLIENT_LATENCY_INFO_PREFIX = LiteralStringRef("client_latency/");
static const Key CLIENT_LATENCY_INFO_CTR_PREFIX = LiteralStringRef("client_latency_counter/");
Reference<LocationInfo> LocationInfo::getInterface( DatabaseContext *cx, std::vector<StorageServerInterface> const& alternatives, LocalityData const& clientLocality ) {
std::vector<UID> handles;
for( auto const& alternative : alternatives )
handles.push_back( alternative.getVersion.getEndpoint().token ); // getVersion here was a random choice
std::sort( handles.begin(), handles.end() );
ASSERT( handles.size() );
Reference<StorageServerInfo> StorageServerInfo::getInterface( DatabaseContext *cx, StorageServerInterface const& ssi, LocalityData const& locality ) {
auto it = cx->server_interf.find( ssi.id() );
if( it != cx->server_interf.end() ) {
if(it->second->interf.getVersion.getEndpoint().token != ssi.getVersion.getEndpoint().token) {
if(it->second->interf.locality == ssi.locality) {
//FIXME: load balance holds pointers to individual members of the interface, and this assignment will swap out the object they are
// pointing to. This is technically correct, but is very unnatural. We may want to refactor load balance to take an AsyncVar<Reference<Interface>>
// so that it is notified when the interface changes.
it->second->interf = ssi;
} else {
it->second->notifyContextDestroyed();
Reference<StorageServerInfo> loc( new StorageServerInfo(cx, ssi, locality) );
cx->server_interf[ ssi.id() ] = loc.getPtr();
return loc;
}
}
auto it = cx->ssid_locationInfo.find( handles );
if( it != cx->ssid_locationInfo.end() ) {
return Reference<LocationInfo>::addRef( it->second );
return Reference<StorageServerInfo>::addRef( it->second );
}
Reference<LocationInfo> loc( new LocationInfo(cx, alternatives, clientLocality) );
cx->ssid_locationInfo[ handles ] = loc.getPtr();
Reference<StorageServerInfo> loc( new StorageServerInfo(cx, ssi, locality) );
cx->server_interf[ ssi.id() ] = loc.getPtr();
return loc;
}
void LocationInfo::notifyContextDestroyed() {
void StorageServerInfo::notifyContextDestroyed() {
cx = NULL;
}
LocationInfo::~LocationInfo() {
StorageServerInfo::~StorageServerInfo() {
if( cx ) {
std::vector<UID> handles;
for( auto const& alternative : getAlternatives() )
handles.push_back( alternative.v.getVersion.getEndpoint().token ); // must match above choice of UID
std::sort( handles.begin(), handles.end() );
ASSERT_ABORT( handles.size() );
auto it = cx->ssid_locationInfo.find( handles );
if( it != cx->ssid_locationInfo.end() )
cx->ssid_locationInfo.erase( it );
auto it = cx->server_interf.find( interf.id() );
if( it != cx->server_interf.end() )
cx->server_interf.erase( it );
cx = NULL;
}
}
@ -498,6 +500,7 @@ DatabaseContext::DatabaseContext(
ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo ) {
try {
state Optional<std::string> incorrectConnectionString;
loop {
OpenDatabaseRequest req;
req.knownClientInfoID = outInfo->get().id;
@ -507,11 +510,18 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
ClusterConnectionString fileConnectionString;
if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
req.issues = LiteralStringRef("incorrect_cluster_file_contents");
std::string connectionString = ccf->getConnectionString().toString();
if(ccf->canGetFilename()) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContents").detail("Filename", ccf->getFilename())
// Don't log a SevWarnAlways the first time to account for transient issues (e.g. someone else changing the file right before us)
TraceEvent(incorrectConnectionString.present() && incorrectConnectionString.get() == connectionString ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
.detail("Filename", ccf->getFilename())
.detail("ConnectionStringFromFile", fileConnectionString.toString())
.detail("CurrentConnectionString", ccf->getConnectionString().toString());
.detail("CurrentConnectionString", connectionString);
}
incorrectConnectionString = connectionString;
}
else {
incorrectConnectionString = Optional<std::string>();
}
choose {
@ -549,9 +559,9 @@ Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, F
DatabaseContext::~DatabaseContext() {
monitorMasterProxiesInfoChange.cancel();
for(auto it = ssid_locationInfo.begin(); it != ssid_locationInfo.end(); it = ssid_locationInfo.erase(it))
for(auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it))
it->second->notifyContextDestroyed();
ASSERT_ABORT( ssid_locationInfo.empty() );
ASSERT_ABORT( server_interf.empty() );
locationCache.insert( allKeys, Reference<LocationInfo>() );
}
@ -597,8 +607,14 @@ bool DatabaseContext::getCachedLocations( const KeyRangeRef& range, vector<std::
}
Reference<LocationInfo> DatabaseContext::setCachedLocation( const KeyRangeRef& keys, const vector<StorageServerInterface>& servers ) {
vector<Reference<ReferencedInterface<StorageServerInterface>>> serverRefs;
serverRefs.reserve(servers.size());
for(auto& interf : servers) {
serverRefs.push_back( StorageServerInfo::getInterface( this, interf, clientLocality ) );
}
int maxEvictionAttempts = 100, attempts = 0;
Reference<LocationInfo> loc = LocationInfo::getInterface( this, servers, clientLocality);
Reference<LocationInfo> loc = Reference<LocationInfo>( new LocationInfo(serverRefs) );
while( locationCache.size() > locationCacheSize && attempts < maxEvictionAttempts) {
TEST( true ); // NativeAPI storage server locationCache entry evicted
attempts++;
@ -657,8 +673,8 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
case FDBDatabaseOptions::MACHINE_ID:
clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
ssid_locationInfo.clear();
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ) );
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
case FDBDatabaseOptions::MAX_WATCHES:
@ -668,7 +684,7 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
ssid_locationInfo.clear();
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
}
@ -948,7 +964,7 @@ Reference<ProxyInfo> DatabaseContext::getMasterProxies() {
return masterProxies;
}
//Actor which will wait until the ProxyInfo returned by the DatabaseContext cx is not NULL
//Actor which will wait until the MultiInterface<MasterProxyInterface> returned by the DatabaseContext cx is not NULL
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx) {
loop{
Reference<ProxyInfo> proxies = cx->getMasterProxies();
@ -2935,31 +2951,21 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
StorageMetrics permittedError,
int shardLimit )
{
state int tooManyShardsCount = 0;
loop {
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskDataDistribution) ) );
vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskDataDistribution) ) );
if( locations.size() == shardLimit ) {
TraceEvent(!g_network->isSimulated() && ++tooManyShardsCount >= 15 ? SevWarnAlways : SevWarn, "WaitStorageMetricsPenalty")
.detail("Keys", printable(keys))
.detail("Locations", locations.size())
.detail("Limit", CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT)
.detail("JitteredSecondsOfPenitence", CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY);
wait(delayJittered(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskDataDistribution));
// make sure that the next getKeyRangeLocations() call will actually re-fetch the range
cx->invalidateCache( keys );
} else {
tooManyShardsCount = 0;
//SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
//SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
if(locations.size() < shardLimit) {
try {
Future<StorageMetrics> fx;
if (locations.size() > 1) {
StorageMetrics x = wait( waitStorageMetricsMultipleLocations( locations, min, max, permittedError ) );
return x;
fx = waitStorageMetricsMultipleLocations( locations, min, max, permittedError );
} else {
WaitMetricsRequest req( keys, min, max );
StorageMetrics x = wait( loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskDataDistribution ) );
return x;
fx = loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskDataDistribution );
}
StorageMetrics x = wait(fx);
return x;
} catch (Error& e) {
if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
TraceEvent(SevError, "WaitStorageMetricsError").error(e);
@ -2968,6 +2974,14 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
cx->invalidateCache(keys);
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskDataDistribution));
}
} else {
TraceEvent(SevWarn, "WaitStorageMetricsPenalty")
.detail("Keys", printable(keys))
.detail("Limit", CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT)
.detail("JitteredSecondsOfPenitence", CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY);
wait(delayJittered(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskDataDistribution));
// make sure that the next getKeyRangeLocations() call will actually re-fetch the range
cx->invalidateCache( keys );
}
}
}

View File

@ -21,7 +21,7 @@
#ifndef FDBCLIENT_STATUS_H
#define FDBCLIENT_STATUS_H
#include "../fdbrpc/JSONDoc.h"
#include "fdbclient/JSONDoc.h"
// Reads the entire string s as a JSON value
// Throws if no value can be parsed or if s contains data after the first JSON value

View File

@ -36,6 +36,7 @@ struct StorageServerInterface {
};
enum { LocationAwareLoadBalance = 1 };
enum { AlwaysFresh = 0 };
LocalityData locality;
UID uniqueID;

View File

@ -395,6 +395,8 @@ const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCom
const KeyRef globalKeysPrefix = LiteralStringRef("\xff/globals");
const KeyRef lastEpochEndKey = LiteralStringRef("\xff/globals/lastEpochEnd");
const KeyRef lastEpochEndPrivateKey = LiteralStringRef("\xff\xff/globals/lastEpochEnd");
const KeyRef rebootWhenDurableKey = LiteralStringRef("\xff/globals/rebootWhenDurable");
const KeyRef rebootWhenDurablePrivateKey = LiteralStringRef("\xff\xff/globals/rebootWhenDurable");
const KeyRef fastLoggingEnabled = LiteralStringRef("\xff/globals/fastLoggingEnabled");
const KeyRef fastLoggingEnabledPrivateKey = LiteralStringRef("\xff\xff/globals/fastLoggingEnabled");

View File

@ -148,6 +148,8 @@ std::pair<vector<std::pair<UID, NetworkAddress>>,vector<std::pair<UID, NetworkAd
extern const KeyRef globalKeysPrefix;
extern const KeyRef lastEpochEndKey;
extern const KeyRef lastEpochEndPrivateKey;
extern const KeyRef rebootWhenDurableKey;
extern const KeyRef rebootWhenDurablePrivateKey;
extern const KeyRef fastLoggingEnabled;
extern const KeyRef fastLoggingEnabledPrivateKey;

49
fdbclient/fdbclient.vcxproj Executable file → Normal file
View File

@ -20,13 +20,14 @@
</ProjectConfiguration>
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="KeyRangeMap.actor.cpp" />
<ActorCompiler Include="NativeAPI.actor.cpp" />
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="AsyncFileBlobStore.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="Atomic.h" />
<ClInclude Include="BackupContainer.h" />
<ClInclude Include="BackupAgent.h" />
<ClInclude Include="BlobStore.h" />
<ClInclude Include="ClientDBInfo.h" />
<ClInclude Include="ClientLogEvents.h" />
<ClInclude Include="ClientWorkerInterface.h" />
@ -34,19 +35,21 @@
<ClInclude Include="CommitTransaction.h" />
<ClInclude Include="CoordinationInterface.h" />
<ClInclude Include="DatabaseConfiguration.h" />
<ActorCompiler Include="DatabaseContext.h" />
<ClInclude Include="DatabaseContext.h" />
<ActorCompiler Include="EventTypes.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="KeyBackedTypes.h" />
<ClInclude Include="MetricLogger.h" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ClInclude Include="FailureMonitorClient.h" />
<ClInclude Include="FDBOptions.g.h" />
<ClInclude Include="FDBOptions.h" />
<ClInclude Include="FDBTypes.h" />
<ClInclude Include="HTTP.h" />
<ClInclude Include="KeyBackedTypes.h" />
<ClInclude Include="MetricLogger.h" />
<ClInclude Include="FailureMonitorClient.h" />
<ClInclude Include="IClientApi.h" />
<ClInclude Include="JsonBuilder.h" />
<ClInclude Include="JSONDoc.h" />
<ClInclude Include="json_spirit\json_spirit_error_position.h" />
<ClInclude Include="json_spirit\json_spirit_reader_template.h" />
<ClInclude Include="json_spirit\json_spirit_value.h" />
@ -54,8 +57,13 @@
<ClInclude Include="json_spirit\json_spirit_writer_template.h" />
<ClInclude Include="KeyRangeMap.h" />
<ClInclude Include="Knobs.h" />
<ClInclude Include="libb64\cdecode.h" />
<ClInclude Include="libb64\cencode.h" />
<ClInclude Include="libb64\decode.h" />
<ClInclude Include="libb64\encode.h" />
<ClInclude Include="ManagementAPI.h" />
<ClInclude Include="MasterProxyInterface.h" />
<ClInclude Include="md5\md5.h" />
<ClInclude Include="MonitorLeader.h" />
<ClInclude Include="MultiVersionAssignmentVars.h" />
<ClInclude Include="MultiVersionTransaction.h" />
@ -66,41 +74,52 @@
<ActorCompiler Include="RunTransaction.actor.h" />
<ClInclude Include="RYWIterator.h" />
<ClInclude Include="Schemas.h" />
<ClInclude Include="sha1\SHA1.h" />
<ClInclude Include="SnapshotCache.h" />
<ClInclude Include="Status.h" />
<ClInclude Include="StatusClient.h" />
<ClInclude Include="StorageServerInterface.h" />
<ClInclude Include="Subspace.h" />
<ClInclude Include="SystemData.h" />
<ClInclude Include="TaskBucket.h" />
<ClInclude Include="ThreadSafeTransaction.h" />
<ClInclude Include="Tuple.h" />
<ActorCompiler Include="VersionedMap.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="VersionedMap.h" />
<ClInclude Include="WriteMap.h" />
<ClInclude Include="Subspace.h" />
<ClInclude Include="Tuple.h" />
<ClInclude Include="JsonBuilder.h" />
<ClInclude Include="xml2json.hpp" />
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="FailureMonitorClient.actor.cpp" />
<ActorCompiler Include="ReadYourWrites.actor.cpp" />
<ActorCompiler Include="AsyncFileBlobStore.actor.cpp" />
<ClCompile Include="AutoPublicAddress.cpp" />
<ActorCompiler Include="BackupAgentBase.actor.cpp" />
<ActorCompiler Include="BackupContainer.actor.cpp" />
<ActorCompiler Include="BlobStore.actor.cpp" />
<ActorCompiler Include="DatabaseBackupAgent.actor.cpp" />
<ClCompile Include="DatabaseConfiguration.cpp" />
<ClCompile Include="AutoPublicAddress.cpp" />
<ActorCompiler Include="FailureMonitorClient.actor.cpp" />
<ClCompile Include="FDBOptions.g.cpp" />
<ActorCompiler Include="FileBackupAgent.actor.cpp" />
<ActorCompiler Include="HTTP.actor.cpp" />
<ActorCompiler Include="KeyRangeMap.actor.cpp" />
<ClCompile Include="Knobs.cpp" />
<ClCompile Include="libb64\cdecode.c" />
<ClCompile Include="libb64\cencode.c" />
<ClCompile Include="md5\md5.c" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ActorCompiler Include="MonitorLeader.actor.cpp" />
<ActorCompiler Include="ManagementAPI.actor.cpp" />
<ActorCompiler Include="MultiVersionTransaction.actor.cpp" />
<ActorCompiler Include="NativeAPI.actor.cpp" />
<ActorCompiler Include="ReadYourWrites.actor.cpp" />
<ClCompile Include="RYWIterator.cpp" />
<ActorCompiler Include="StatusClient.actor.cpp" />
<ClCompile Include="Schemas.cpp" />
<ClCompile Include="SystemData.cpp" />
<ClCompile Include="sha1\SHA1.cpp" />
<ActorCompiler Include="ThreadSafeTransaction.actor.cpp" />
<ActorCompiler Include="TaskBucket.actor.cpp" />
<ClCompile Include="Subspace.cpp" />

24
fdbrpc/xml2json.hpp → fdbclient/xml2json.hpp Executable file → Normal file
View File

@ -9,19 +9,19 @@
#include <string>
#include <cctype>
#include "rapidxml/rapidxml.hpp"
#include "rapidxml/rapidxml_utils.hpp"
#include "rapidxml/rapidxml_print.hpp"
#include "fdbclient/rapidxml/rapidxml.hpp"
#include "fdbclient/rapidxml/rapidxml_utils.hpp"
#include "fdbclient/rapidxml/rapidxml_print.hpp"
#include "rapidjson/document.h"
#include "rapidjson/prettywriter.h"
#include "rapidjson/encodedstream.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/reader.h"
#include "rapidjson/writer.h"
#include "rapidjson/filereadstream.h"
#include "rapidjson/filewritestream.h"
#include "rapidjson/error/en.h"
#include "fdbclient/rapidjson/document.h"
#include "fdbclient/rapidjson/prettywriter.h"
#include "fdbclient/rapidjson/encodedstream.h"
#include "fdbclient/rapidjson/stringbuffer.h"
#include "fdbclient/rapidjson/reader.h"
#include "fdbclient/rapidjson/writer.h"
#include "fdbclient/rapidjson/filereadstream.h"
#include "fdbclient/rapidjson/filewritestream.h"
#include "fdbclient/rapidjson/error/en.h"
/* [Start] This part is configurable */
static const char xml2json_text_additional_name[] = "#text";

View File

@ -422,7 +422,7 @@ struct Peer : NonCopyable {
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
}
else {
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).detail("PeerAddr", self->destination);
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
}
if (conn) {

View File

@ -155,9 +155,9 @@ void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinis
// Keep trying to get a reply from any of servers until success or cancellation; tries to take into account
// failMon's information for load balancing and avoiding failed servers
// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the list of servers
ACTOR template <class Interface, class Request>
ACTOR template <class Interface, class Request, class Multi>
Future< REPLY_TYPE(Request) > loadBalance(
Reference<MultiInterface<Interface>> alternatives,
Reference<MultiInterface<Multi>> alternatives,
RequestStream<Request> Interface::* channel,
Request request = Request(),
int taskID = TaskDefaultPromiseEndpoint,
@ -273,39 +273,37 @@ Future< REPLY_TYPE(Request) > loadBalance(
if(!stream && !firstRequest.isValid() ) {
// Everything is down! Wait for someone to be up.
if(now() - g_network->networkMetrics.newestAlternativesFailure > FLOW_KNOBS->ALTERNATIVES_FAILURE_RESET_TIME) {
g_network->networkMetrics.oldestAlternativesFailure = now();
}
double serversValidTime = alternatives->getRetrievedAt();
double minDelay = std::min(FLOW_KNOBS->CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED - (now() - serversValidTime), FLOW_KNOBS->ALTERNATIVES_FAILURE_MIN_DELAY);
double delay = std::max(std::min((now()-g_network->networkMetrics.oldestAlternativesFailure)*FLOW_KNOBS->ALTERNATIVES_FAILURE_DELAY_RATIO, FLOW_KNOBS->ALTERNATIVES_FAILURE_MAX_DELAY), minDelay);
if(serversValidTime == ALWAYS_FRESH)
delay = ALWAYS_FRESH;
// Making this SevWarn means a lot of clutter
if(now() - g_network->networkMetrics.newestAlternativesFailure > 1 || g_random->random01() < 0.01) {
TraceEvent("AllAlternativesFailed")
.detail("Interval", FLOW_KNOBS->CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED)
.detail("ServersValidTime", serversValidTime)
.detail("Alternatives", alternatives->description())
.detail("Delay", delay);
}
g_network->networkMetrics.newestAlternativesFailure = now();
if (delay < 0) {
throw all_alternatives_failed();
}
vector<Future<Void>> ok( alternatives->size() );
for(int i=0; i<ok.size(); i++)
for(int i=0; i<ok.size(); i++) {
ok[i] = IFailureMonitor::failureMonitor().onStateEqual( alternatives->get(i, channel).getEndpoint(), FailureStatus(false) );
choose {
when ( wait( quorum( ok, 1 ) ) ) {}
when ( wait( ::delayJittered( delay ) ) ) {
throw all_alternatives_failed();
}
if(!alternatives->alwaysFresh()) {
if(now() - g_network->networkMetrics.newestAlternativesFailure > FLOW_KNOBS->ALTERNATIVES_FAILURE_RESET_TIME) {
g_network->networkMetrics.oldestAlternativesFailure = now();
}
double delay = std::max(std::min((now()-g_network->networkMetrics.oldestAlternativesFailure)*FLOW_KNOBS->ALTERNATIVES_FAILURE_DELAY_RATIO, FLOW_KNOBS->ALTERNATIVES_FAILURE_MAX_DELAY), FLOW_KNOBS->ALTERNATIVES_FAILURE_MIN_DELAY);
// Making this SevWarn means a lot of clutter
if(now() - g_network->networkMetrics.newestAlternativesFailure > 1 || g_random->random01() < 0.01) {
TraceEvent("AllAlternativesFailed")
.detail("Interval", FLOW_KNOBS->CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED)
.detail("Alternatives", alternatives->description())
.detail("Delay", delay);
}
g_network->networkMetrics.newestAlternativesFailure = now();
choose {
when ( wait( quorum( ok, 1 ) ) ) {}
when ( wait( ::delayJittered( delay ) ) ) {
throw all_alternatives_failed();
}
}
} else {
wait( quorum( ok, 1 ) );
}
numAttempts = 0; // now that we've got a server back, reset the backoff
@ -409,19 +407,6 @@ Future< REPLY_TYPE(Request) > loadBalance(
}
}
// This wrapper is just to help the compiler accept the coercesion to Reference<Multinterface>
template <class Interface, class Request, class Multi>
inline Future< REPLY_TYPE(Request) > loadBalance(
Reference<Multi> alternatives,
RequestStream<Request> Interface::* channel,
Request request = Request(),
int taskID = TaskDefaultPromiseEndpoint,
bool atMostOnce = false,
QueueModel* model = NULL)
{
return loadBalance( Reference<MultiInterface<Interface>>(alternatives), channel, request, taskID, atMostOnce, model );
}
#include "flow/unactorcompiler.h"
#endif

Some files were not shown because too many files have changed in this diff Show More