Merge remote-tracking branch 'upstream/release-6.2' into certificate-refresh

This commit is contained in:
Alex Miller 2020-03-04 20:25:42 -08:00
commit 595dd77ed1
23 changed files with 188 additions and 78 deletions

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.17.pkg <https://www.foundationdb.org/downloads/6.2.17/macOS/installers/FoundationDB-6.2.17.pkg>`_
* `FoundationDB-6.2.18.pkg <https://www.foundationdb.org/downloads/6.2.18/macOS/installers/FoundationDB-6.2.18.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.17-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.17/ubuntu/installers/foundationdb-clients_6.2.17-1_amd64.deb>`_
* `foundationdb-server-6.2.17-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.17/ubuntu/installers/foundationdb-server_6.2.17-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.18-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.18/ubuntu/installers/foundationdb-clients_6.2.18-1_amd64.deb>`_
* `foundationdb-server-6.2.18-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.18/ubuntu/installers/foundationdb-server_6.2.18-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.17-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.17/rhel6/installers/foundationdb-clients-6.2.17-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.17-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.17/rhel6/installers/foundationdb-server-6.2.17-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.18-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.18/rhel6/installers/foundationdb-clients-6.2.18-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.18-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.18/rhel6/installers/foundationdb-server-6.2.18-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.17-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.17/rhel7/installers/foundationdb-clients-6.2.17-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.17-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.17/rhel7/installers/foundationdb-server-6.2.17-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.18-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.18/rhel7/installers/foundationdb-clients-6.2.18-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.18-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.18/rhel7/installers/foundationdb-server-6.2.18-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.17-x64.msi <https://www.foundationdb.org/downloads/6.2.17/windows/installers/foundationdb-6.2.17-x64.msi>`_
* `foundationdb-6.2.18-x64.msi <https://www.foundationdb.org/downloads/6.2.18/windows/installers/foundationdb-6.2.18-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.2.17.tar.gz <https://www.foundationdb.org/downloads/6.2.17/bindings/python/foundationdb-6.2.17.tar.gz>`_
* `foundationdb-6.2.18.tar.gz <https://www.foundationdb.org/downloads/6.2.18/bindings/python/foundationdb-6.2.18.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.17.gem <https://www.foundationdb.org/downloads/6.2.17/bindings/ruby/fdb-6.2.17.gem>`_
* `fdb-6.2.18.gem <https://www.foundationdb.org/downloads/6.2.18/bindings/ruby/fdb-6.2.18.gem>`_
Java 8+
-------
* `fdb-java-6.2.17.jar <https://www.foundationdb.org/downloads/6.2.17/bindings/java/fdb-java-6.2.17.jar>`_
* `fdb-java-6.2.17-javadoc.jar <https://www.foundationdb.org/downloads/6.2.17/bindings/java/fdb-java-6.2.17-javadoc.jar>`_
* `fdb-java-6.2.18.jar <https://www.foundationdb.org/downloads/6.2.18/bindings/java/fdb-java-6.2.18.jar>`_
* `fdb-java-6.2.18-javadoc.jar <https://www.foundationdb.org/downloads/6.2.18/bindings/java/fdb-java-6.2.18-javadoc.jar>`_
Go 1.11+
--------

View File

@ -2,13 +2,38 @@
Release Notes
#############
6.2.18
======
Fixes
-----
* When configuring a cluster to usable_regions=2, data distribution would not react to machine failures while copying data to the remote region. `(PR #2774) <https://github.com/apple/foundationdb/pull/2774>`_.
* When a cluster is configured with usable_regions=2, data distribution could push a cluster into saturation by relocating too many shards simulatenously. `(PR #2776) <https://github.com/apple/foundationdb/pull/2776>`_.
* Backup could not establish TLS connections (broken in 6.2.16). `(PR #2775) <https://github.com/apple/foundationdb/pull/2775>`_.
Performance
-----------
* Improved the efficiency of establishing large numbers of network connections. `(PR #2777) <https://github.com/apple/foundationdb/pull/2777>`_.
Features
--------
* Add support for setting knobs in fdbcli. `(PR #2773) <https://github.com/apple/foundationdb/pull/2773>`_.
Other Changes
-------------
* Setting invalid knobs in backup and DR binaries is now a warning instead of an error and will not result in the application being terminated. `(PR #2773) <https://github.com/apple/foundationdb/pull/2773>`_.
6.2.17
======
Fixes
-----
* Restored the ability to set TLS configuration using environment variables. `(PR #2755) <https://github.com/apple/foundationdb/pull/2755>`_.
* Restored the ability to set TLS configuration using environment variables (broken in 6.2.16). `(PR #2755) <https://github.com/apple/foundationdb/pull/2755>`_.
6.2.16
======

View File

@ -3197,15 +3197,19 @@ int main(int argc, char* argv[]) {
if (!flowKnobs->setKnob( k->first, k->second ) &&
!clientKnobs->setKnob( k->first, k->second ))
{
fprintf(stderr, "Unrecognized knob option '%s'\n", k->first.c_str());
return FDB_EXIT_ERROR;
fprintf(stderr, "WARNING: Unrecognized knob option '%s'\n", k->first.c_str());
TraceEvent(SevWarnAlways, "UnrecognizedKnobOption").detail("Knob", printable(k->first));
}
} catch (Error& e) {
if (e.code() == error_code_invalid_option_value) {
fprintf(stderr, "Invalid value '%s' for option '%s'\n", k->second.c_str(), k->first.c_str());
return FDB_EXIT_ERROR;
fprintf(stderr, "WARNING: Invalid value '%s' for knob option '%s'\n", k->second.c_str(), k->first.c_str());
TraceEvent(SevWarnAlways, "InvalidKnobValue").detail("Knob", printable(k->first)).detail("Value", printable(k->second));
}
else {
fprintf(stderr, "ERROR: Failed to set knob option '%s': %s\n", k->first.c_str(), e.what());
TraceEvent(SevError, "FailedToSetKnob").detail("Knob", printable(k->first)).detail("Value", printable(k->second)).error(e);
throw;
}
throw;
}
}

View File

@ -70,7 +70,8 @@ enum {
OPT_NO_STATUS,
OPT_STATUS_FROM_JSON,
OPT_VERSION,
OPT_TRACE_FORMAT
OPT_TRACE_FORMAT,
OPT_KNOB
};
CSimpleOpt::SOption g_rgOptions[] = { { OPT_CONNFILE, "-C", SO_REQ_SEP },
@ -88,12 +89,13 @@ CSimpleOpt::SOption g_rgOptions[] = { { OPT_CONNFILE, "-C", SO_REQ_SEP },
{ OPT_VERSION, "--version", SO_NONE },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
#endif
SO_END_OF_OPTIONS };
SO_END_OF_OPTIONS };
void printAtCol(const char* text, int col) {
const char* iter = text;
@ -424,6 +426,8 @@ static void printProgramUsage(const char* name) {
#ifndef TLS_DISABLED
TLS_HELP
#endif
" --knob_KNOBNAME KNOBVALUE\n"
" Changes a knob option. KNOBNAME should be lowercase.\n"
" -v, --version Print FoundationDB CLI version information and exit.\n"
" -h, --help Display this help and exit.\n");
}
@ -2445,6 +2449,8 @@ struct CLIOptions {
std::string tlsCAPath;
std::string tlsPassword;
std::vector<std::pair<std::string, std::string>> knobs;
CLIOptions( int argc, char* argv[] )
: trace(false),
exit_timeout(0),
@ -2468,9 +2474,38 @@ struct CLIOptions {
}
if (exit_timeout && !exec.present()) {
fprintf(stderr, "ERROR: --timeout may only be specified with --exec\n");
exit_code = 1;
exit_code = FDB_EXIT_ERROR;
return;
}
delete FLOW_KNOBS;
FlowKnobs* flowKnobs = new FlowKnobs(true);
FLOW_KNOBS = flowKnobs;
delete CLIENT_KNOBS;
ClientKnobs* clientKnobs = new ClientKnobs(true);
CLIENT_KNOBS = clientKnobs;
for(auto k=knobs.begin(); k!=knobs.end(); ++k) {
try {
if (!flowKnobs->setKnob( k->first, k->second ) &&
!clientKnobs->setKnob( k->first, k->second ))
{
fprintf(stderr, "WARNING: Unrecognized knob option '%s'\n", k->first.c_str());
TraceEvent(SevWarnAlways, "UnrecognizedKnobOption").detail("Knob", printable(k->first));
}
} catch (Error& e) {
if (e.code() == error_code_invalid_option_value) {
fprintf(stderr, "WARNING: Invalid value '%s' for knob option '%s'\n", k->second.c_str(), k->first.c_str());
TraceEvent(SevWarnAlways, "InvalidKnobValue").detail("Knob", printable(k->first)).detail("Value", printable(k->second));
}
else {
fprintf(stderr, "ERROR: Failed to set knob option '%s': %s\n", k->first.c_str(), e.what());
TraceEvent(SevError, "FailedToSetKnob").detail("Knob", printable(k->first)).detail("Value", printable(k->second)).error(e);
exit_code = FDB_EXIT_ERROR;
}
}
}
}
int processArg(CSimpleOpt& args) {
@ -2537,6 +2572,16 @@ struct CLIOptions {
}
traceFormat = args.OptionArg();
break;
case OPT_KNOB: {
std::string syn = args.OptionSyntax();
if (!StringRef(syn).startsWith(LiteralStringRef("--knob_"))) {
fprintf(stderr, "ERROR: unable to parse knob option '%s'\n", syn.c_str());
return FDB_EXIT_ERROR;
}
syn = syn.substr(7);
knobs.push_back( std::make_pair( syn, args.OptionArg() ) );
break;
}
case OPT_VERSION:
printVersion();
return FDB_EXIT_SUCCESS;

View File

@ -507,6 +507,7 @@ ACTOR Future<BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<BlobS
if (service.empty())
service = b->knobs.secure_connection ? "https" : "http";
state Reference<IConnection> conn = wait(INetworkConnections::net()->connect(b->host, service, b->knobs.secure_connection ? true : false));
wait(conn->connectHandshake());
TraceEvent("BlobStoreEndpointNewConnection").suppressFor(60)
.detail("RemoteEndpoint", conn->getPeerAddress())

View File

@ -93,6 +93,7 @@ struct ClientVersionRef {
}
ClientVersionRef(Arena &arena, ClientVersionRef const& cv) : clientVersion(arena, cv.clientVersion), sourceVersion(arena, cv.sourceVersion), protocolVersion(arena, cv.protocolVersion) {}
ClientVersionRef(StringRef clientVersion, StringRef sourceVersion, StringRef protocolVersion) : clientVersion(clientVersion), sourceVersion(sourceVersion), protocolVersion(protocolVersion) {}
ClientVersionRef(std::string versionString) {
size_t index = versionString.find(",");
if(index == versionString.npos) {

View File

@ -691,7 +691,7 @@ void shrinkProxyList( ClientDBInfo& ni, std::vector<UID>& lastProxyUIDs, std::ve
}
// Leader is the process that will be elected by coordinators as the cluster controller
ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, MonitorLeaderInfo info, Standalone<VectorRef<ClientVersionRef>> supportedVersions, Key traceLogGroup) {
ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, MonitorLeaderInfo info, Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions, Key traceLogGroup) {
state ClusterConnectionString cs = info.intermediateConnFile->getConnectionString();
state vector<NetworkAddress> addrs = cs.coordinators();
state int idx = 0;
@ -707,7 +707,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
req.clusterKey = cs.clusterKey();
req.coordinators = cs.coordinators();
req.knownClientInfoID = clientInfo->get().id;
req.supportedVersions = supportedVersions;
req.supportedVersions = supportedVersions->get();
req.traceLogGroup = traceLogGroup;
ClusterConnectionString fileConnectionString;
@ -760,7 +760,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
}
}
ACTOR Future<Void> monitorProxies( Reference<AsyncVar<Reference<ClusterConnectionFile>>> connFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, Standalone<VectorRef<ClientVersionRef>> supportedVersions, Key traceLogGroup ) {
ACTOR Future<Void> monitorProxies( Reference<AsyncVar<Reference<ClusterConnectionFile>>> connFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions, Key traceLogGroup ) {
state MonitorLeaderInfo info(connFile->get());
loop {
choose {

View File

@ -57,7 +57,7 @@ Future<Void> monitorLeader( Reference<ClusterConnectionFile> const& connFile, Re
Future<Void> monitorLeaderForProxies( Value const& key, vector<NetworkAddress> const& coordinators, ClientData* const& clientData );
Future<Void> monitorProxies( Reference<AsyncVar<Reference<ClusterConnectionFile>>> const& connFile, Reference<AsyncVar<ClientDBInfo>> const& clientInfo, Standalone<VectorRef<ClientVersionRef>> const& supportedVersions, Key const& traceLogGroup );
Future<Void> monitorProxies( Reference<AsyncVar<Reference<ClusterConnectionFile>>> const& connFile, Reference<AsyncVar<ClientDBInfo>> const& clientInfo, Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> const& supportedVersions, Key const& traceLogGroup );
void shrinkProxyList( ClientDBInfo& ni, std::vector<UID>& lastProxyUIDs, std::vector<MasterProxyInterface>& lastProxies );

View File

@ -69,6 +69,21 @@ using std::pair;
NetworkOptions networkOptions;
TLSConfig tlsConfig(TLSEndpointType::CLIENT);
// The default values, TRACE_DEFAULT_ROLL_SIZE and TRACE_DEFAULT_MAX_LOGS_SIZE are located in Trace.h.
NetworkOptions::NetworkOptions()
: localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()),
traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"),
traceFormat("xml"), slowTaskProfilingEnabled(false) {
Standalone<VectorRef<ClientVersionRef>> defaultSupportedVersions;
StringRef sourceVersion = StringRef((const uint8_t*)getHGVersion(), strlen(getHGVersion()));
std::string protocolVersionString = format("%llx", currentProtocolVersion.version());
defaultSupportedVersions.push_back_deep(defaultSupportedVersions.arena(), ClientVersionRef(LiteralStringRef(FDB_VT_VERSION), sourceVersion, protocolVersionString));
supportedVersions = ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>::from(defaultSupportedVersions);
}
static const Key CLIENT_LATENCY_INFO_PREFIX = LiteralStringRef("client_latency/");
static const Key CLIENT_LATENCY_INFO_CTR_PREFIX = LiteralStringRef("client_latency_counter/");
@ -943,18 +958,19 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
ASSERT(g_network);
ASSERT(value.present());
networkOptions.supportedVersions.resize(networkOptions.supportedVersions.arena(), 0);
Standalone<VectorRef<ClientVersionRef>> supportedVersions;
std::string versionString = value.get().toString();
size_t index = 0;
size_t nextIndex = 0;
while(nextIndex != versionString.npos) {
nextIndex = versionString.find(';', index);
networkOptions.supportedVersions.push_back_deep(networkOptions.supportedVersions.arena(), ClientVersionRef(versionString.substr(index, nextIndex-index)));
supportedVersions.push_back_deep(supportedVersions.arena(), ClientVersionRef(versionString.substr(index, nextIndex-index)));
index = nextIndex + 1;
}
ASSERT(networkOptions.supportedVersions.size() > 0);
ASSERT(supportedVersions.size() > 0);
networkOptions.supportedVersions->set(supportedVersions);
break;
}

View File

@ -25,7 +25,6 @@
#elif !defined(FDBCLIENT_NATIVEAPI_ACTOR_H)
#define FDBCLIENT_NATIVEAPI_ACTOR_H
#include "flow/flow.h"
#include "flow/TDMetric.actor.h"
#include "fdbclient/FDBTypes.h"
@ -59,14 +58,10 @@ struct NetworkOptions {
std::string traceLogGroup;
std::string traceFormat;
Optional<bool> logClientInfo;
Standalone<VectorRef<ClientVersionRef>> supportedVersions;
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions;
bool slowTaskProfilingEnabled;
// The default values, TRACE_DEFAULT_ROLL_SIZE and TRACE_DEFAULT_MAX_LOGS_SIZE are located in Trace.h.
NetworkOptions()
: localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()),
traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"),
traceFormat("xml"), slowTaskProfilingEnabled(false) {}
NetworkOptions();
};
class Database {

View File

@ -988,6 +988,7 @@ ACTOR static Future<Void> connectionIncoming( TransportData* self, Reference<ICo
ACTOR static Future<Void> listen( TransportData* self, NetworkAddress listenAddr ) {
state ActorCollectionNoErrors incoming; // Actors monitoring incoming connections that haven't yet been associated with a peer
state Reference<IListener> listener = INetworkConnections::net()->listen( listenAddr );
state uint64_t connectionCount = 0;
try {
loop {
Reference<IConnection> conn = wait( listener->accept() );
@ -997,7 +998,10 @@ ACTOR static Future<Void> listen( TransportData* self, NetworkAddress listenAddr
.detail("ListenAddress", listenAddr.toString());
incoming.add( connectionIncoming(self, conn) );
}
wait(delay(0) || delay(FLOW_KNOBS->CONNECTION_ACCEPT_DELAY, TaskPriority::WriteSocket));
connectionCount++;
if( connectionCount%(FLOW_KNOBS->ACCEPT_BATCH_SIZE) == 0 ) {
wait(delay(0, TaskPriority::AcceptSocket));
}
}
} catch (Error& e) {
TraceEvent(SevError, "ListenError").error(e);

View File

@ -2870,7 +2870,9 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
lastWrongConfiguration = anyWrongConfiguration;
state int lastPriority = team->getPriority();
if( serversLeft < self->configuration.storageTeamSize ) {
if(team->size() == 0) {
team->setPriority( SERVER_KNOBS->PRIORITY_POPULATE_REGION );
} else if( serversLeft < self->configuration.storageTeamSize ) {
if( serversLeft == 0 )
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_0_LEFT );
else if( serversLeft == 1 )
@ -2887,10 +2889,11 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY );
}
}
else if( anyUndesired )
else if( anyUndesired ) {
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER );
else
} else {
team->setPriority( SERVER_KNOBS->PRIORITY_TEAM_HEALTHY );
}
if(lastPriority != team->getPriority()) {
self->priority_teams[lastPriority]--;
@ -4279,7 +4282,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, configuration.storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
vector<DDTeamCollection*> teamCollectionsPtrs;
Reference<DDTeamCollection> primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );

View File

@ -204,6 +204,7 @@ Future<Void> dataDistributionQueue(
PromiseStream<Promise<int64_t>> const& getAverageShardBytes,
UID const& distributorId,
int const& teamSize,
int const& singleRegionTeamSize,
double* const& lastLimited);
//Holds the permitted size and IO Bounds for a shard

View File

@ -57,7 +57,8 @@ struct RelocateData {
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT), interval("QueuedRelocation") {}
static bool isHealthPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
return priority == SERVER_KNOBS->PRIORITY_POPULATE_REGION ||
priority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
priority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
priority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT ||
priority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT ||
@ -285,29 +286,30 @@ struct Busyness {
};
// find the "workFactor" for this, were it launched now
int getWorkFactor( RelocateData const& relocation ) {
// Avoid the divide by 0!
ASSERT( relocation.src.size() );
int getWorkFactor( RelocateData const& relocation, int singleRegionTeamSize ) {
if( relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT )
return WORK_FULL_UTILIZATION / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
else if( relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT )
return WORK_FULL_UTILIZATION / 2 / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
else // for now we assume that any message at a lower priority can best be assumed to have a full team left for work
return WORK_FULL_UTILIZATION / relocation.src.size() / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
return WORK_FULL_UTILIZATION / singleRegionTeamSize / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
}
// Data movement's resource control: Do not overload source servers used for the RelocateData
// return true if servers are not too busy to launch the relocation
bool canLaunch( RelocateData & relocation, int teamSize, std::map<UID, Busyness> & busymap,
bool canLaunch( RelocateData & relocation, int teamSize, int singleRegionTeamSize, std::map<UID, Busyness> & busymap,
std::vector<RelocateData> cancellableRelocations ) {
// assert this has not already been launched
ASSERT( relocation.workFactor == 0 );
ASSERT( relocation.src.size() != 0 );
ASSERT( teamSize >= singleRegionTeamSize );
// find the "workFactor" for this, were it launched now
int workFactor = getWorkFactor( relocation );
int neededServers = std::max( 1, (int)relocation.src.size() - teamSize + 1 );
int workFactor = getWorkFactor( relocation, singleRegionTeamSize );
int neededServers = std::min<int>( relocation.src.size(), teamSize - singleRegionTeamSize + 1 );
if(SERVER_KNOBS->USE_OLD_NEEDED_SERVERS) {
neededServers = std::max( 1, (int)relocation.src.size() - teamSize + 1 );
}
// see if each of the SS can launch this task
for( int i = 0; i < relocation.src.size(); i++ ) {
// For each source server for this relocation, copy and modify its busyness to reflect work that WOULD be cancelled
@ -328,9 +330,9 @@ bool canLaunch( RelocateData & relocation, int teamSize, std::map<UID, Busyness>
}
// update busyness for each server
void launch( RelocateData & relocation, std::map<UID, Busyness> & busymap ) {
void launch( RelocateData & relocation, std::map<UID, Busyness> & busymap, int singleRegionTeamSize ) {
// if we are here this means that we can launch and should adjust all the work the servers can do
relocation.workFactor = getWorkFactor( relocation );
relocation.workFactor = getWorkFactor( relocation, singleRegionTeamSize );
for( int i = 0; i < relocation.src.size(); i++ )
busymap[ relocation.src[i] ].addWork( relocation.priority, relocation.workFactor );
}
@ -359,6 +361,7 @@ struct DDQueueData {
int queuedRelocations;
int64_t bytesWritten;
int teamSize;
int singleRegionTeamSize;
std::map<UID, Busyness> busymap;
@ -394,7 +397,7 @@ struct DDQueueData {
// ensure a team remover will not start before the previous one finishes removing a team and move away data
// NOTE: split and merge shard have higher priority. If they have to wait for unhealthyRelocations = 0,
// deadlock may happen: split/merge shard waits for unhealthyRelocations, while blocks team_redundant.
if (healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
if (healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
unhealthyRelocations++;
rawProcessingUnhealthy->set(true);
@ -402,7 +405,7 @@ struct DDQueueData {
priority_relocations[priority]++;
}
void finishRelocation(int priority, int healthPriority) {
if (healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
if (healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT ||
healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT || healthPriority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
unhealthyRelocations--;
ASSERT(unhealthyRelocations >= 0);
@ -415,10 +418,10 @@ struct DDQueueData {
DDQueueData( UID mid, MoveKeysLock lock, Database cx, std::vector<TeamCollectionInterface> teamCollections,
Reference<ShardsAffectedByTeamFailure> sABTF, PromiseStream<Promise<int64_t>> getAverageShardBytes,
int teamSize, PromiseStream<RelocateShard> output, FutureStream<RelocateShard> input, PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited ) :
int teamSize, int singleRegionTeamSize, PromiseStream<RelocateShard> output, FutureStream<RelocateShard> input, PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited ) :
activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ),
shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), distributorId( mid ), lock( lock ),
cx( cx ), teamSize( teamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
cx( cx ), teamSize( teamSize ), singleRegionTeamSize( singleRegionTeamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited),
suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar<bool>(false) ) {}
@ -815,7 +818,7 @@ struct DDQueueData {
// Data movement avoids overloading source servers in moving data.
// SOMEDAY: the list of source servers may be outdated since they were fetched when the work was put in the queue
// FIXME: we need spare capacity even when we're just going to be cancelling work via TEAM_HEALTHY
if( !canLaunch( rd, teamSize, busymap, cancellableRelocations ) ) {
if( !canLaunch( rd, teamSize, singleRegionTeamSize, busymap, cancellableRelocations ) ) {
//logRelocation( rd, "SkippingQueuedRelocation" );
continue;
}
@ -853,7 +856,7 @@ struct DDQueueData {
RelocateData& rrs = inFlight.rangeContaining(ranges[r].begin)->value();
rrs.keys = ranges[r];
launch( rrs, busymap );
launch( rrs, busymap, singleRegionTeamSize );
activeRelocations++;
startRelocation(rrs.priority, rrs.healthPriority);
inFlightActors.insert( rrs.keys, dataDistributionRelocator( this, rrs ) );
@ -927,7 +930,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
while( tciIndex < self->teamCollections.size() ) {
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, false, inflightPenalty);
req.completeSources = rd.completeSources;
@ -1409,9 +1412,10 @@ ACTOR Future<Void> dataDistributionQueue(
PromiseStream<Promise<int64_t>> getAverageShardBytes,
UID distributorId,
int teamSize,
int singleRegionTeamSize,
double* lastLimited)
{
state DDQueueData self( distributorId, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, output, input, getShardMetrics, lastLimited );
state DDQueueData self( distributorId, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, singleRegionTeamSize, output, input, getShardMetrics, lastLimited );
state std::set<UID> serversToLaunchFrom;
state KeyRange keysToLaunchFrom;
state RelocateData launchData;
@ -1510,6 +1514,7 @@ ACTOR Future<Void> dataDistributionQueue(
.detail( "PriorityTeamContainsUndesiredServer", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER] )
.detail( "PriorityTeamRedundant", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT] )
.detail( "PriorityMergeShard", self.priority_relocations[SERVER_KNOBS->PRIORITY_MERGE_SHARD] )
.detail( "PriorityPopulateRegion", self.priority_relocations[SERVER_KNOBS->PRIORITY_POPULATE_REGION] )
.detail( "PriorityTeamUnhealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY] )
.detail( "PriorityTeam2Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_2_LEFT] )
.detail( "PriorityTeam1Left", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_1_LEFT] )

View File

@ -104,6 +104,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( INFLIGHT_PENALTY_HEALTHY, 1.0 );
init( INFLIGHT_PENALTY_UNHEALTHY, 500.0 );
init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 );
init( USE_OLD_NEEDED_SERVERS, false );
init( PRIORITY_RECOVER_MOVE, 110 );
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
@ -112,6 +113,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
init( PRIORITY_TEAM_REDUNDANT, 200 );
init( PRIORITY_MERGE_SHARD, 340 );
init( PRIORITY_POPULATE_REGION, 600 );
init( PRIORITY_TEAM_UNHEALTHY, 700 );
init( PRIORITY_TEAM_2_LEFT, 709 );
init( PRIORITY_TEAM_1_LEFT, 800 );

View File

@ -104,7 +104,8 @@ public:
double INFLIGHT_PENALTY_REDUNDANT;
double INFLIGHT_PENALTY_UNHEALTHY;
double INFLIGHT_PENALTY_ONE_LEFT;
bool USE_OLD_NEEDED_SERVERS;
// Higher priorities are executed first
// Priority/100 is the "priority group"/"superpriority". Priority inversion
// is possible within but not between priority groups; fewer priority groups
@ -117,6 +118,7 @@ public:
int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
int PRIORITY_TEAM_REDUNDANT;
int PRIORITY_MERGE_SHARD;
int PRIORITY_POPULATE_REGION;
int PRIORITY_TEAM_UNHEALTHY;
int PRIORITY_TEAM_2_LEFT;
int PRIORITY_TEAM_1_LEFT;

View File

@ -1430,29 +1430,30 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker,
stateSectionObj["description"] = "No replicas remain of some data";
stateSectionObj["min_replicas_remaining"] = 0;
replicas = 0;
}
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_1_LEFT) {
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_1_LEFT) {
stateSectionObj["healthy"] = false;
stateSectionObj["name"] = "healing";
stateSectionObj["description"] = "Only one replica remains of some data";
stateSectionObj["min_replicas_remaining"] = 1;
replicas = 1;
}
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) {
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) {
stateSectionObj["healthy"] = false;
stateSectionObj["name"] = "healing";
stateSectionObj["description"] = "Only two replicas remain of some data";
stateSectionObj["min_replicas_remaining"] = 2;
replicas = 2;
}
else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY) {
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY) {
stateSectionObj["healthy"] = false;
stateSectionObj["name"] = "healing";
stateSectionObj["description"] = "Restoring replication factor";
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_POPULATE_REGION) {
stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "healthy_populating_region";
stateSectionObj["description"] = "Populating remote region";
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_MERGE_SHARD) {
stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "healthy_repartitioning";
stateSectionObj["description"] = "Repartitioning.";
stateSectionObj["description"] = "Repartitioning";
} else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT) {
stateSectionObj["healthy"] = true;
stateSectionObj["name"] = "optimizing_team_collections";

View File

@ -1475,9 +1475,11 @@ int main(int argc, char* argv[]) {
}
} catch (Error& e) {
if (e.code() == error_code_invalid_option_value) {
fprintf(stderr, "WARNING: Invalid value '%s' for option '%s'\n", k->second.c_str(), k->first.c_str());
fprintf(stderr, "WARNING: Invalid value '%s' for knob option '%s'\n", k->second.c_str(), k->first.c_str());
TraceEvent(SevWarnAlways, "InvalidKnobValue").detail("Knob", printable(k->first)).detail("Value", printable(k->second));
} else {
fprintf(stderr, "ERROR: Failed to set knob option '%s': %s\n", k->first.c_str(), e.what());
TraceEvent(SevError, "FailedToSetKnob").detail("Knob", printable(k->first)).detail("Value", printable(k->second)).error(e);
throw;
}
}

View File

@ -67,7 +67,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( MAX_RECONNECTION_TIME, 0.5 );
init( RECONNECTION_TIME_GROWTH_RATE, 1.2 );
init( RECONNECTION_RESET_TIME, 5.0 );
init( CONNECTION_ACCEPT_DELAY, 0.5 );
init( ACCEPT_BATCH_SIZE, 20 );
init( USE_OBJECT_SERIALIZER, 1 );
init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 );
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );

View File

@ -87,7 +87,7 @@ public:
double MAX_RECONNECTION_TIME;
double RECONNECTION_TIME_GROWTH_RATE;
double RECONNECTION_RESET_TIME;
double CONNECTION_ACCEPT_DELAY;
int ACCEPT_BATCH_SIZE;
int USE_OBJECT_SERIALIZER;
int TLS_CERT_REFRESH_DELAY_SECONDS;

View File

@ -451,12 +451,13 @@ private:
};
class Listener : public IListener, ReferenceCounted<Listener> {
boost::asio::io_context& io_service;
NetworkAddress listenAddress;
tcp::acceptor acceptor;
public:
Listener( boost::asio::io_service& io_service, NetworkAddress listenAddress )
: listenAddress(listenAddress), acceptor( io_service, tcpEndpoint( listenAddress ) )
Listener( boost::asio::io_context& io_service, NetworkAddress listenAddress )
: io_service(io_service), listenAddress(listenAddress), acceptor( io_service, tcpEndpoint( listenAddress ) )
{
platform::setCloseOnExec(acceptor.native_handle());
}
@ -473,7 +474,7 @@ public:
private:
ACTOR static Future<Reference<IConnection>> doAccept( Listener* self ) {
state Reference<Connection> conn( new Connection( self->acceptor.get_io_service() ) );
state Reference<Connection> conn( new Connection( self->io_service ) );
state tcp::acceptor::endpoint_type peer_endpoint;
try {
BindPromise p("N2_AcceptError", UID());
@ -785,13 +786,14 @@ private:
};
class SSLListener : public IListener, ReferenceCounted<SSLListener> {
boost::asio::io_context& io_service;
NetworkAddress listenAddress;
tcp::acceptor acceptor;
boost::asio::ssl::context* context;
public:
SSLListener( boost::asio::io_service& io_service, boost::asio::ssl::context* context, NetworkAddress listenAddress )
: listenAddress(listenAddress), acceptor( io_service, tcpEndpoint( listenAddress ) ), context(context)
SSLListener( boost::asio::io_context& io_service, boost::asio::ssl::context* context, NetworkAddress listenAddress )
: io_service(io_service), listenAddress(listenAddress), acceptor( io_service, tcpEndpoint( listenAddress ) ), context(context)
{
platform::setCloseOnExec(acceptor.native_handle());
}
@ -808,7 +810,7 @@ public:
private:
ACTOR static Future<Reference<IConnection>> doAccept( SSLListener* self ) {
state Reference<SSLConnection> conn( new SSLConnection( self->acceptor.get_io_service(), *self->context) );
state Reference<SSLConnection> conn( new SSLConnection( self->io_service, *self->context) );
state tcp::acceptor::endpoint_type peer_endpoint;
try {
BindPromise p("N2_AcceptError", UID());
@ -860,7 +862,7 @@ Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics)
tlsInitialized(false),
tlsConfig(tlsConfig)
#ifndef TLS_DISABLED
,sslContext(boost::asio::ssl::context(boost::asio::ssl::context::tlsv12))
,sslContext(boost::asio::ssl::context(boost::asio::ssl::context::tls))
#endif
{
@ -1025,7 +1027,6 @@ void Net2::initTLS() {
sslContext.use_private_key(boost::asio::buffer(keyBytes.data(), keyBytes.size()), boost::asio::ssl::context::pem);
}
} catch(boost::system::system_error e) {
fprintf(stderr, "Error initializing TLS: %s\n", e.what());
TraceEvent("Net2TLSInitError").detail("Message", e.what());
throw tls_error();
}

View File

@ -653,6 +653,7 @@ class ReferencedObject : NonCopyable, public ReferenceCounted<ReferencedObject<V
ReferencedObject() : value() {}
ReferencedObject(V const& v) : value(v) {}
ReferencedObject(ReferencedObject&& r) : value(std::move(r.value)) {}
void operator=(ReferencedObject&& r) {
value = std::move(r.value);
}

View File

@ -43,6 +43,7 @@ enum class TaskPriority {
DiskIOComplete = 9150,
LoadBalancedEndpoint = 9000,
ReadSocket = 9000,
AcceptSocket = 8950,
Handshake = 8900,
CoordinationReply = 8810,
Coordination = 8800,