merge release 6.2 into master

This commit is contained in:
Evan Tschannen 2020-03-17 12:51:47 -07:00
commit e08f0201f1
46 changed files with 614 additions and 269 deletions

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.18.pkg <https://www.foundationdb.org/downloads/6.2.18/macOS/installers/FoundationDB-6.2.18.pkg>`_
* `FoundationDB-6.2.19.pkg <https://www.foundationdb.org/downloads/6.2.19/macOS/installers/FoundationDB-6.2.19.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.18-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.18/ubuntu/installers/foundationdb-clients_6.2.18-1_amd64.deb>`_
* `foundationdb-server-6.2.18-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.18/ubuntu/installers/foundationdb-server_6.2.18-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.19-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.19/ubuntu/installers/foundationdb-clients_6.2.19-1_amd64.deb>`_
* `foundationdb-server-6.2.19-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.19/ubuntu/installers/foundationdb-server_6.2.19-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.18-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.18/rhel6/installers/foundationdb-clients-6.2.18-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.18-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.18/rhel6/installers/foundationdb-server-6.2.18-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.19-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.19/rhel6/installers/foundationdb-clients-6.2.19-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.19-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.19/rhel6/installers/foundationdb-server-6.2.19-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.18-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.18/rhel7/installers/foundationdb-clients-6.2.18-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.18-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.18/rhel7/installers/foundationdb-server-6.2.18-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.19-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.19/rhel7/installers/foundationdb-clients-6.2.19-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.19-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.19/rhel7/installers/foundationdb-server-6.2.19-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.18-x64.msi <https://www.foundationdb.org/downloads/6.2.18/windows/installers/foundationdb-6.2.18-x64.msi>`_
* `foundationdb-6.2.19-x64.msi <https://www.foundationdb.org/downloads/6.2.19/windows/installers/foundationdb-6.2.19-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package:
* `foundationdb-6.2.18.tar.gz <https://www.foundationdb.org/downloads/6.2.18/bindings/python/foundationdb-6.2.18.tar.gz>`_
* `foundationdb-6.2.19.tar.gz <https://www.foundationdb.org/downloads/6.2.19/bindings/python/foundationdb-6.2.19.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.18.gem <https://www.foundationdb.org/downloads/6.2.18/bindings/ruby/fdb-6.2.18.gem>`_
* `fdb-6.2.19.gem <https://www.foundationdb.org/downloads/6.2.19/bindings/ruby/fdb-6.2.19.gem>`_
Java 8+
-------
* `fdb-java-6.2.18.jar <https://www.foundationdb.org/downloads/6.2.18/bindings/java/fdb-java-6.2.18.jar>`_
* `fdb-java-6.2.18-javadoc.jar <https://www.foundationdb.org/downloads/6.2.18/bindings/java/fdb-java-6.2.18-javadoc.jar>`_
* `fdb-java-6.2.19.jar <https://www.foundationdb.org/downloads/6.2.19/bindings/java/fdb-java-6.2.19.jar>`_
* `fdb-java-6.2.19-javadoc.jar <https://www.foundationdb.org/downloads/6.2.19/bindings/java/fdb-java-6.2.19-javadoc.jar>`_
Go 1.11+
--------

View File

@ -190,6 +190,9 @@
},
"megabits_received":{
"hz":0.0
},
"tls_policy_failures":{
"hz":0.0
}
},
"run_loop_busy":0.2 // fraction of time the run loop was busy
@ -404,6 +407,7 @@
},
"required_logs":3,
"missing_logs":"7f8d623d0cb9966e",
"active_generations":1,
"description":"Recovery complete."
},
"workload":{
@ -422,6 +426,16 @@
"hz":0.0,
"counter":0,
"roughness":0.0
},
"location_requests":{ // measures number of outgoing key server location responses
"hz":0.0,
"counter":0,
"roughness":0.0
},
"memory_errors":{ // measures number of proxy_memory_limit_exceeded errors
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"bytes":{ // measures number of logical bytes read/written (ignoring replication factor and overhead on disk). Perfectly spaced operations will have a roughness of 1.0. Randomly spaced (Poisson-distributed) operations will have a roughness of 2.0, with increased bunching resulting in increased values. Higher roughness can result in increased latency due to increased queuing.

View File

@ -2,6 +2,33 @@
Release Notes
#############
6.2.19
======
Fixes
-----
* Protect the proxies from running out of memory when bombarded with requests from clients. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* One process with a ``proxy`` class would not become the first proxy when put with other ``stateless`` class processes. `(PR #2819) <https://github.com/apple/foundationdb/pull/2819>`_.
* If a transaction log stalled on a disk operation during recruitment the cluster would become unavailable until the process died. `(PR #2815) <https://github.com/apple/foundationdb/pull/2815>`_.
* Avoid recruiting satellite transaction logs when ``usable_regions=1``. `(PR #2813) <https://github.com/apple/foundationdb/pull/2813>`_.
* Prevent the cluster from having too many active generations as a safety measure against repeated failures. `(PR #2814) <https://github.com/apple/foundationdb/pull/2814>`_.
* ``fdbcli`` status JSON could become truncated because of unprintable characters. `(PR #2807) <https://github.com/apple/foundationdb/pull/2807>`_.
* The data distributor used too much CPU in large clusters (broken in 6.2.16). `(PR #2806) <https://github.com/apple/foundationdb/pull/2806>`_.
Status
------
* Added ``cluster.workload.operations.memory_errors`` to measure the number of requests rejected by the proxies because the memory limit has been exceeded. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* Added ``cluster.workload.operations.location_requests`` to measure the number of outgoing key server location responses from the proxies. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* Added ``cluster.recovery_state.active_generations`` to track the number of generations for which the cluster still requires transaction logs. `(PR #2814) <https://github.com/apple/foundationdb/pull/2814>`_.
* Added ``network.tls_policy_failures`` to the ``processes`` section to record the number of TLS policy failures each process has observed. `(PR #2811) <https://github.com/apple/foundationdb/pull/2811>`_.
Features
--------
* Added ``--debug-tls`` as a command line argument to ``fdbcli`` to help diagnose TLS issues. `(PR #2810) <https://github.com/apple/foundationdb/pull/2810>`_.
6.2.18
======
@ -22,7 +49,7 @@ Performance
Features
--------
* Add support for setting knobs to modify the behavior of fdbcli. `(PR #2773) <https://github.com/apple/foundationdb/pull/2773>`_.
* Add support for setting knobs to modify the behavior of ``fdbcli``. `(PR #2773) <https://github.com/apple/foundationdb/pull/2773>`_.
Other Changes
-------------

View File

@ -71,7 +71,8 @@ enum {
OPT_STATUS_FROM_JSON,
OPT_VERSION,
OPT_TRACE_FORMAT,
OPT_KNOB
OPT_KNOB,
OPT_DEBUG_TLS
};
CSimpleOpt::SOption g_rgOptions[] = { { OPT_CONNFILE, "-C", SO_REQ_SEP },
@ -91,6 +92,7 @@ CSimpleOpt::SOption g_rgOptions[] = { { OPT_CONNFILE, "-C", SO_REQ_SEP },
{ OPT_VERSION, "-v", SO_NONE },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
{ OPT_DEBUG_TLS, "--debug-tls", SO_NONE },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
@ -429,6 +431,8 @@ static void printProgramUsage(const char* name) {
#endif
" --knob_KNOBNAME KNOBVALUE\n"
" Changes a knob option. KNOBNAME should be lowercase.\n"
" --debug-tls Prints the TLS configuration and certificate chain, then exits.\n"
" Useful in reporting and diagnosing TLS issues.\n"
" -v, --version Print FoundationDB CLI version information and exit.\n"
" -h, --help Display this help and exit.\n");
}
@ -2480,6 +2484,7 @@ struct CLIOptions {
Optional<std::string> exec;
bool initialStatusCheck = true;
bool cliHints = true;
bool debugTLS = false;
std::string tlsCertPath;
std::string tlsKeyPath;
std::string tlsVerifyPeers;
@ -2577,36 +2582,36 @@ struct CLIOptions {
#ifndef TLS_DISABLED
// TLS Options
case TLSConfig::OPT_TLS_PLUGIN:
args.OptionArg();
break;
case TLSConfig::OPT_TLS_CERTIFICATES:
tlsCertPath = args.OptionArg();
break;
case TLSConfig::OPT_TLS_CA_FILE:
tlsCAPath = args.OptionArg();
break;
case TLSConfig::OPT_TLS_KEY:
tlsKeyPath = args.OptionArg();
break;
case TLSConfig::OPT_TLS_PASSWORD:
tlsPassword = args.OptionArg();
break;
case TLSConfig::OPT_TLS_VERIFY_PEERS:
tlsVerifyPeers = args.OptionArg();
break;
case TLSConfig::OPT_TLS_PLUGIN:
args.OptionArg();
break;
case TLSConfig::OPT_TLS_CERTIFICATES:
tlsCertPath = args.OptionArg();
break;
case TLSConfig::OPT_TLS_CA_FILE:
tlsCAPath = args.OptionArg();
break;
case TLSConfig::OPT_TLS_KEY:
tlsKeyPath = args.OptionArg();
break;
case TLSConfig::OPT_TLS_PASSWORD:
tlsPassword = args.OptionArg();
break;
case TLSConfig::OPT_TLS_VERIFY_PEERS:
tlsVerifyPeers = args.OptionArg();
break;
#endif
case OPT_HELP:
printProgramUsage(program_name.c_str());
return 0;
case OPT_STATUS_FROM_JSON:
return printStatusFromJSON(args.OptionArg());
case OPT_TRACE_FORMAT:
if (!validateTraceFormat(args.OptionArg())) {
fprintf(stderr, "WARNING: Unrecognized trace format `%s'\n", args.OptionArg());
}
traceFormat = args.OptionArg();
break;
case OPT_HELP:
printProgramUsage(program_name.c_str());
return 0;
case OPT_STATUS_FROM_JSON:
return printStatusFromJSON(args.OptionArg());
case OPT_TRACE_FORMAT:
if (!validateTraceFormat(args.OptionArg())) {
fprintf(stderr, "WARNING: Unrecognized trace format `%s'\n", args.OptionArg());
}
traceFormat = args.OptionArg();
break;
case OPT_KNOB: {
std::string syn = args.OptionSyntax();
if (!StringRef(syn).startsWith(LiteralStringRef("--knob_"))) {
@ -2617,11 +2622,14 @@ struct CLIOptions {
knobs.push_back( std::make_pair( syn, args.OptionArg() ) );
break;
}
case OPT_VERSION:
printVersion();
return FDB_EXIT_SUCCESS;
}
return -1;
case OPT_DEBUG_TLS:
debugTLS = true;
break;
case OPT_VERSION:
printVersion();
return FDB_EXIT_SUCCESS;
}
return -1;
}
};
@ -3850,6 +3858,30 @@ int main(int argc, char **argv) {
return 1;
}
if (opt.debugTLS) {
#ifndef TLS_DISABLED
// Backdoor into NativeAPI's tlsConfig, which is where the above network option settings ended up.
extern TLSConfig tlsConfig;
printf("TLS Configuration:\n");
printf("\tCertificate Path: %s\n", tlsConfig.getCertificatePathSync().c_str());
printf("\tKey Path: %s\n", tlsConfig.getKeyPathSync().c_str());
printf("\tCA Path: %s\n", tlsConfig.getCAPathSync().c_str());
try {
LoadedTLSConfig loaded = tlsConfig.loadSync();
printf("\tPassword: %s\n", loaded.getPassword().empty() ? "Not configured" : "Exists, but redacted");
printf("\n");
loaded.print(stdout);
} catch (Error& e) {
printf("ERROR: %s (%d)\n", e.what(), e.code());
printf("Use --log and look at the trace logs for more detailed information on the failure.\n");
return 1;
}
#else
printf("This fdbcli was built with TLS disabled.\n");
#endif
return 0;
}
try {
setupNetwork();
Future<int> cliFuture = runCli(opt);

View File

@ -107,7 +107,7 @@ struct DatabaseConfiguration {
int expectedLogSets( Optional<Key> dcId ) const {
int result = 1;
if(dcId.present() && getRegion(dcId.get()).satelliteTLogReplicationFactor > 0) {
if(dcId.present() && getRegion(dcId.get()).satelliteTLogReplicationFactor > 0 && usableRegions > 1) {
result++;
}

View File

@ -41,6 +41,10 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( CLIENT_FAILURE_TIMEOUT_DELAY, FAILURE_MIN_DELAY );
init( FAILURE_EMERGENCY_DELAY, 30.0 );
init( FAILURE_MAX_GENERATIONS, 10 );
init( RECOVERY_DELAY_START_GENERATION, 70 );
init( RECOVERY_DELAY_SECONDS_PER_GENERATION, 60.0 );
init( MAX_GENERATIONS, 100 );
init( MAX_GENERATIONS_OVERRIDE, 0 );
init( COORDINATOR_RECONNECTION_DELAY, 1.0 );
init( CLIENT_EXAMPLE_AMOUNT, 20 );

View File

@ -40,6 +40,10 @@ public:
double CLIENT_FAILURE_TIMEOUT_DELAY;
double FAILURE_EMERGENCY_DELAY;
double FAILURE_MAX_GENERATIONS;
double RECOVERY_DELAY_START_GENERATION;
double RECOVERY_DELAY_SECONDS_PER_GENERATION;
double MAX_GENERATIONS;
double MAX_GENERATIONS_OVERRIDE;
double COORDINATOR_RECONNECTION_DELAY;
int CLIENT_EXAMPLE_AMOUNT;

View File

@ -68,11 +68,11 @@ struct MasterProxyInterface {
}
void initEndpoints() {
getConsistentReadVersion.getEndpoint(TaskPriority::ProxyGetConsistentReadVersion);
getConsistentReadVersion.getEndpoint(TaskPriority::ReadSocket);
getRawCommittedVersion.getEndpoint(TaskPriority::ProxyGetRawCommittedVersion);
commit.getEndpoint(TaskPriority::ProxyCommitDispatcher);
commit.getEndpoint(TaskPriority::ReadSocket);
getStorageServerRejoinInfo.getEndpoint(TaskPriority::ProxyStorageRejoin);
//getKeyServersLocations.getEndpoint(TaskProxyGetKeyServersLocations); //do not increase the priority of these requests, because clients cans bring down the cluster with too many of these messages.
getKeyServersLocations.getEndpoint(TaskPriority::ReadSocket); //priority lowered to TaskPriority::DefaultEndpoint on the proxy
}
};

View File

@ -223,7 +223,7 @@ template <> void delref( DatabaseContext* ptr ) { ptr->delref(); }
ACTOR Future<Void> databaseLogger( DatabaseContext *cx ) {
state double lastLogged = 0;
loop {
wait(delay(CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, cx->taskID));
wait(delay(CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskPriority::FlushTrace));
TraceEvent ev("TransactionMetrics", cx->dbId);
ev.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged)
@ -3101,7 +3101,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream< std::p
if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
send_batch = true;
else if (!timeout.isValid())
timeout = delay(batchTime, TaskPriority::ProxyGetConsistentReadVersion);
timeout = delay(batchTime, TaskPriority::GetConsistentReadVersion);
}
when(wait(timeout.isValid() ? timeout : Never())) { send_batch = true; }
// dynamic batching monitors reply latencies
@ -3136,6 +3136,9 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Re
cx->GRVLatencies.addSample(latency);
if (trLogInfo)
trLogInfo->addLog(FdbClientLogEvents::EventGetVersion_V2(startTime, latency, flags & GetReadVersionRequest::FLAG_PRIORITY_MASK));
if (rep.version == 1 && rep.locked) {
throw proxy_memory_limit_exceeded();
}
if(rep.locked && !lockAware)
throw database_locked();

View File

@ -1156,7 +1156,7 @@ Future<Version> ReadYourWritesTransaction::getReadVersion() {
Optional<Value> getValueFromJSON(StatusObject statusObj) {
try {
Value output = StringRef(json_spirit::write_string(json_spirit::mValue(statusObj), json_spirit::Output_options::raw_utf8).c_str());
Value output = StringRef(json_spirit::write_string(json_spirit::mValue(statusObj), json_spirit::Output_options::none));
return output;
}
catch (std::exception& e){

View File

@ -213,6 +213,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"megabits_received":{
"hz":0.0
},
"tls_policy_failures":{
"hz":0.0
}
},
"run_loop_busy":0.2
@ -434,6 +437,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"required_logs":3,
"missing_logs":"7f8d623d0cb9966e",
"active_generations":1,
"description":"Recovery complete."
},
"workload":{
@ -452,6 +456,16 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"hz":0.0,
"counter":0,
"roughness":0.0
},
"location_requests":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"memory_errors":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"bytes":{

View File

@ -25,6 +25,7 @@
#include "flow/IThreadPool.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/TLSConfig.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
void forceLinkFlowTests() {}
@ -244,6 +245,10 @@ struct YieldMockNetwork : INetwork, ReferenceCounted<YieldMockNetwork> {
virtual void run() { return baseNetwork->run(); }
virtual void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) { return baseNetwork->getDiskBytes(directory,free,total); }
virtual bool isAddressOnThisHost(NetworkAddress const& addr) { return baseNetwork->isAddressOnThisHost(addr); }
virtual const TLSConfig& getTLSConfig() {
static TLSConfig emptyConfig;
return emptyConfig;
}
};
struct NonserializableThing {};

View File

@ -381,6 +381,7 @@ public:
bool operator == (const RequestStream<T>& rhs) const { return queue == rhs.queue; }
bool isEmpty() const { return !queue->isReady(); }
uint32_t size() const { return queue->size(); }
private:
NetNotifiedQueue<T>* queue;

View File

@ -866,6 +866,10 @@ public:
}
}
}
virtual const TLSConfig& getTLSConfig() {
static TLSConfig emptyConfig;
return emptyConfig;
}
virtual void stop() { isStopped = true; }
virtual bool isSimulated() const { return true; }

View File

@ -536,8 +536,12 @@ public:
vector<WorkerDetails> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, std::pair<vector<WorkerDetails>,vector<WorkerDetails>>> fitness_workers;
vector<WorkerDetails> results;
if (amount <= 0)
if(minWorker.present()) {
results.push_back(minWorker.get().worker);
}
if (amount <= results.size()) {
return results;
}
for( auto& it : id_worker ) {
auto fitness = it.second.details.processClass.machineClassFitness( role );
@ -734,7 +738,7 @@ public:
}
std::vector<WorkerDetails> satelliteLogs;
if(region.satelliteTLogReplicationFactor > 0) {
if(region.satelliteTLogReplicationFactor > 0 && req.configuration.usableRegions > 1) {
satelliteLogs = getWorkersForSatelliteLogs( req.configuration, region, remoteRegion, id_used, result.satelliteFallback );
for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].interf);
@ -744,11 +748,8 @@ public:
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, req.configuration, id_used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, req.configuration, id_used );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, id_used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, id_used, first_resolver );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies(), req.configuration, id_used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers(), req.configuration, id_used, first_resolver );
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].interf);
@ -779,7 +780,7 @@ public:
if( !goodRecruitmentTime.isReady() &&
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
( region.satelliteTLogReplicationFactor > 0 && RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId), ProcessClass::TLog).betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog)) ) ||
( region.satelliteTLogReplicationFactor > 0 && req.configuration.usableRegions > 1 && RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId), ProcessClass::TLog).betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog)) ) ||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies(), ProcessClass::Proxy).betterCount(RoleFitness(proxies, ProcessClass::Proxy)) ||
RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers(), ProcessClass::Resolver).betterCount(RoleFitness(resolvers, ProcessClass::Resolver)) ) ) {
return operation_failed();
@ -887,11 +888,8 @@ public:
auto first_resolver = getWorkerForRoleInDatacenter( dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, req.configuration, used );
auto first_proxy = getWorkerForRoleInDatacenter( dcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, req.configuration, used );
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies()-1, req.configuration, used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers()-1, req.configuration, used, first_resolver );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies(), req.configuration, used, first_proxy );
auto resolvers = getWorkersForRoleInDatacenter( dcId, ProcessClass::Resolver, req.configuration.getDesiredResolvers(), req.configuration, used, first_resolver );
RoleFitnessPair fitness( RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver) );
@ -966,7 +964,7 @@ public:
std::set<Optional<Key>> primaryDC;
primaryDC.insert(regions[0].dcId);
getWorkersForTlogs(db.config, db.config.tLogReplicationFactor, db.config.getDesiredLogs(), db.config.tLogPolicy, id_used, true, primaryDC);
if(regions[0].satelliteTLogReplicationFactor > 0) {
if(regions[0].satelliteTLogReplicationFactor > 0 && db.config.usableRegions > 1) {
bool satelliteFallback = false;
getWorkersForSatelliteLogs(db.config, regions[0], regions[1], id_used, satelliteFallback, true);
}
@ -1151,7 +1149,7 @@ public:
RoleFitness oldSatelliteTLogFit(satellite_tlogs, ProcessClass::TLog);
bool newSatelliteFallback = false;
auto newSatelliteTLogs = region.satelliteTLogReplicationFactor > 0 ? getWorkersForSatelliteLogs(db.config, region, remoteRegion, id_used, newSatelliteFallback, true) : satellite_tlogs;
auto newSatelliteTLogs = (region.satelliteTLogReplicationFactor > 0 && db.config.usableRegions > 1) ? getWorkersForSatelliteLogs(db.config, region, remoteRegion, id_used, newSatelliteFallback, true) : satellite_tlogs;
RoleFitness newSatelliteTLogFit(newSatelliteTLogs, ProcessClass::TLog);
std::map<Optional<Key>,int32_t> satellite_priority;
@ -1218,10 +1216,8 @@ public:
auto first_resolver = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
auto first_proxy = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
auto proxies = getWorkersForRoleInDatacenter( clusterControllerDcId, ProcessClass::Proxy, db.config.getDesiredProxies()-1, db.config, id_used, first_proxy, true );
auto resolvers = getWorkersForRoleInDatacenter( clusterControllerDcId, ProcessClass::Resolver, db.config.getDesiredResolvers()-1, db.config, id_used, first_resolver, true );
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto proxies = getWorkersForRoleInDatacenter( clusterControllerDcId, ProcessClass::Proxy, db.config.getDesiredProxies(), db.config, id_used, first_proxy, true );
auto resolvers = getWorkersForRoleInDatacenter( clusterControllerDcId, ProcessClass::Resolver, db.config.getDesiredResolvers(), db.config, id_used, first_resolver, true );
RoleFitnessPair newInFit(RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver));
if(oldInFit.proxy.betterFitness(newInFit.proxy) || oldInFit.resolver.betterFitness(newInFit.resolver)) {

View File

@ -647,6 +647,9 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
double lastMedianAvailableSpaceUpdate;
// clang-format on
int lowestUtilizationTeam;
int highestUtilizationTeam;
void resetLocalitySet() {
storageServerSet = Reference<LocalitySet>(new LocalityMap<UID>());
LocalityMap<UID>* storageServerMap = (LocalityMap<UID>*) storageServerSet.getPtr();
@ -690,7 +693,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
optimalTeamCount(0), recruitingStream(0), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY),
unhealthyServers(0), includedDCs(includedDCs), otherTrackedDCs(otherTrackedDCs),
zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary), medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO),
lastMedianAvailableSpaceUpdate(0), processingUnhealthy(processingUnhealthy) {
lastMedianAvailableSpaceUpdate(0), processingUnhealthy(processingUnhealthy), lowestUtilizationTeam(0), highestUtilizationTeam(0) {
if(!primary || configuration.usableRegions == 1) {
TraceEvent("DDTrackerStarting", distributorId)
.detail( "State", "Inactive" )
@ -828,18 +831,29 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
if( req.wantsTrueBest ) {
ASSERT( !bestOption.present() );
auto &startIndex = req.preferLowerUtilization ? self->lowestUtilizationTeam : self->highestUtilizationTeam;
if(startIndex >= self->teams.size()) {
startIndex = 0;
}
int bestIndex = startIndex;
for( int i = 0; i < self->teams.size(); i++ ) {
if (self->teams[i]->isHealthy() &&
(!req.preferLowerUtilization || self->teams[i]->hasHealthyAvailableSpace(self->medianAvailableSpace)) &&
(!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->getShardsFor(ShardsAffectedByTeamFailure::Team(self->teams[i]->getServerIDs(), self->primary)).size() > 0))
int currentIndex = (startIndex + i) % self->teams.size();
if (self->teams[currentIndex]->isHealthy() &&
(!req.preferLowerUtilization || self->teams[currentIndex]->hasHealthyAvailableSpace(self->medianAvailableSpace)))
{
int64_t loadBytes = self->teams[i]->getLoadBytes(true, req.inflightPenalty);
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
int64_t loadBytes = self->teams[currentIndex]->getLoadBytes(true, req.inflightPenalty);
if((!bestOption.present() || (req.preferLowerUtilization && loadBytes < bestLoadBytes) || (!req.preferLowerUtilization && loadBytes > bestLoadBytes)) &&
(!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->hasShards(ShardsAffectedByTeamFailure::Team(self->teams[currentIndex]->getServerIDs(), self->primary))))
{
bestLoadBytes = loadBytes;
bestOption = self->teams[i];
bestOption = self->teams[currentIndex];
bestIndex = currentIndex;
}
}
}
startIndex = bestIndex;
}
else {
int nTries = 0;
@ -848,8 +862,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<IDataDistributionTeam> dest = deterministicRandom()->randomChoice(self->teams);
bool ok = dest->isHealthy() &&
(!req.preferLowerUtilization || dest->hasHealthyAvailableSpace(self->medianAvailableSpace)) &&
(!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->getShardsFor(ShardsAffectedByTeamFailure::Team(dest->getServerIDs(), self->primary)).size() > 0);
(!req.preferLowerUtilization || dest->hasHealthyAvailableSpace(self->medianAvailableSpace));
for(int i=0; ok && i<randomTeams.size(); i++) {
if (randomTeams[i]->getServerIDs() == dest->getServerIDs()) {
@ -858,6 +871,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
ok = ok && (!req.teamMustHaveShards || self->shardsAffectedByTeamFailure->hasShards(ShardsAffectedByTeamFailure::Team(dest->getServerIDs(), self->primary)));
if (ok)
randomTeams.push_back( dest );
else
@ -2850,7 +2865,7 @@ bool teamContainsFailedServer(DDTeamCollection* self, Reference<TCTeamInfo> team
ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> team, bool badTeam, bool redundantTeam) {
state int lastServersLeft = team->size();
state bool lastAnyUndesired = false;
state bool logTeamEvents = g_network->isSimulated() || !badTeam;
state bool logTeamEvents = g_network->isSimulated() || !badTeam || team->size() <= self->configuration.storageTeamSize;
state bool lastReady = false;
state bool lastHealthy;
state bool lastOptimal;
@ -2893,6 +2908,10 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
}
if(serversLeft == 0) {
logTeamEvents = true;
}
// Failed server should not trigger DD if SS failures are set to be ignored
if (!badTeam && self->healthyZone.get().present() && (self->healthyZone.get().get() == ignoreSSFailuresZoneString)) {
ASSERT_WE_THINK(serversLeft == self->configuration.storageTeamSize);
@ -3037,7 +3056,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
// t is the team in primary DC or the remote DC
auto& t = j < teams.first.size() ? teams.first[j] : teams.second[j-teams.first.size()];
if( !t.servers.size() ) {
maxPriority = SERVER_KNOBS->PRIORITY_TEAM_0_LEFT;
maxPriority = std::max( maxPriority, SERVER_KNOBS->PRIORITY_POPULATE_REGION );
break;
}
@ -4165,7 +4184,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(
.detail("StorageTeamSize", self->configuration.storageTeamSize)
.detail("HighestPriority", highestPriority)
.trackLatest(self->primary ? "TotalDataInFlight" : "TotalDataInFlightRemote");
loggingTrigger = delay( SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL );
loggingTrigger = delay( SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace );
}
when( wait( self->serverTrackerErrorOut.getFuture() ) ) {} // Propagate errors from storageServerTracker
when( wait( error ) ) {}

View File

@ -149,6 +149,7 @@ public:
int getNumberOfShards( UID ssID );
vector<KeyRange> getShardsFor( Team team );
bool hasShards(Team team);
//The first element of the pair is either the source for non-moving shards or the destination team for in-flight shards
//The second element of the pair is all previous sources for in-flight shards

View File

@ -1500,7 +1500,7 @@ ACTOR Future<Void> dataDistributionQueue(
Promise<int64_t> req;
getAverageShardBytes.send( req );
recordMetrics = delay(SERVER_KNOBS->DD_QUEUE_LOGGING_INTERVAL);
recordMetrics = delay(SERVER_KNOBS->DD_QUEUE_LOGGING_INTERVAL, TaskPriority::FlushTrace);
int highestPriorityRelocation = 0;
for( auto it = self.priority_relocations.begin(); it != self.priority_relocations.end(); ++it ) {

View File

@ -813,7 +813,7 @@ ACTOR Future<Void> dataDistributionTracker(
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest( "DDTrackerStats" );
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL);
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace);
}
when( GetMetricsRequest req = waitNext( getShardMetrics.getFuture() ) ) {
self.sizeChanges.add( fetchShardMetrics( &self, req ) );
@ -833,6 +833,11 @@ vector<KeyRange> ShardsAffectedByTeamFailure::getShardsFor( Team team ) {
return r;
}
bool ShardsAffectedByTeamFailure::hasShards(Team team) {
auto it = team_shards.lower_bound(std::pair<Team, KeyRange>(team, KeyRangeRef()));
return it != team_shards.end() && it->first == team;
}
int ShardsAffectedByTeamFailure::getNumberOfShards( UID ssID ) {
return storageServerShards[ssID];
}

View File

@ -83,6 +83,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( MAX_CACHE_VERSIONS, 10e6 );
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );
init( TXS_POPPED_MAX_DELAY, 1.0 ); if ( randomize && BUGGIFY ) TXS_POPPED_MAX_DELAY = deterministicRandom()->random01();
init( TLOG_MAX_CREATE_DURATION, 10.0 );
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
@ -302,6 +303,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, 0.001 );
init( START_TRANSACTION_MAX_TRANSACTIONS_TO_START, 100000 );
init( START_TRANSACTION_MAX_REQUESTS_TO_START, 10000 );
init( START_TRANSACTION_MAX_QUEUE_SIZE, 1e6 );
init( KEY_LOCATION_MAX_QUEUE_SIZE, 1e6 );
init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005;
init( COMMIT_TRANSACTION_BATCH_INTERVAL_MIN, 0.001 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_MIN = 0.1;

View File

@ -85,6 +85,7 @@ public:
double TLOG_DEGRADED_DURATION;
int64_t MAX_CACHE_VERSIONS;
double TXS_POPPED_MAX_DELAY;
double TLOG_MAX_CREATE_DURATION;
// Data distribution queue
double HEALTH_POLL_TIME;
@ -247,6 +248,8 @@ public:
double START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL;
double START_TRANSACTION_MAX_TRANSACTIONS_TO_START;
int START_TRANSACTION_MAX_REQUESTS_TO_START;
int START_TRANSACTION_MAX_QUEUE_SIZE;
int KEY_LOCATION_MAX_QUEUE_SIZE;
double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE;
double COMMIT_TRANSACTION_BATCH_INTERVAL_MIN;

View File

@ -51,7 +51,7 @@ struct MasterInterface {
}
void initEndpoints() {
getCommitVersion.getEndpoint( TaskPriority::ProxyGetConsistentReadVersion );
getCommitVersion.getEndpoint( TaskPriority::GetConsistentReadVersion );
tlogRejoin.getEndpoint( TaskPriority::MasterTLogRejoin );
}
};

View File

@ -50,18 +50,19 @@
struct ProxyStats {
CounterCollection cc;
Counter txnRequestIn, txnRequestOut, txnRequestErrors;
Counter txnStartIn, txnStartOut, txnStartBatch;
Counter txnSystemPriorityStartIn, txnSystemPriorityStartOut;
Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut;
Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut;
Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut, txnCommitOutSuccess;
Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut, txnCommitOutSuccess, txnCommitErrors;
Counter txnConflicts;
Counter txnThrottled;
Counter commitBatchIn, commitBatchOut;
Counter mutationBytes;
Counter mutations;
Counter conflictRanges;
Counter keyServerLocationRequests;
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
Version lastCommitVersionAssigned;
LatencyBands commitLatencyBands;
@ -69,23 +70,25 @@ struct ProxyStats {
Future<Void> logger;
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnConflicts("TxnConflicts", cc),
txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc), commitBatchOut("CommitBatchOut", cc),
mutationBytes("MutationBytes", cc), mutations("Mutations", cc), conflictRanges("ConflictRanges", cc),
keyServerLocationRequests("KeyServerLocationRequests", cc), lastCommitVersionAssigned(0),
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc),
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0),
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
specialCounter(cc, "Version", [pVersion](){return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
@ -167,38 +170,55 @@ struct TransactionRateInfo {
};
ACTOR Future<Void> queueTransactionStartRequests(
Reference<AsyncVar<ServerDBInfo>> db,
std::priority_queue<std::pair<GetReadVersionRequest, int64_t>,
std::vector<std::pair<GetReadVersionRequest, int64_t>>>* transactionQueue,
FutureStream<GetReadVersionRequest> readVersionRequests, PromiseStream<Void> GRVTimer, double* lastGRVTime,
double* GRVBatchTime, FutureStream<double> replyTimes, ProxyStats* stats, TransactionRateInfo* batchRateInfo) {
state int64_t counter = 0;
Reference<AsyncVar<ServerDBInfo>> db,
Deque<GetReadVersionRequest> *systemQueue,
Deque<GetReadVersionRequest> *defaultQueue,
Deque<GetReadVersionRequest> *batchQueue,
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer, double *lastGRVTime,
double *GRVBatchTime, FutureStream<double> replyTimes,
ProxyStats* stats, TransactionRateInfo* batchRateInfo)
{
loop choose{
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
if (req.debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
}
if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
stats->txnSystemPriorityStartIn += req.transactionCount;
} else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
stats->txnDefaultPriorityStartIn += req.transactionCount;
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
if( stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() > SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE ) {
++stats->txnRequestErrors;
//FIXME: send an error instead of giving an unreadable version when the client can support the error: req.reply.sendError(proxy_memory_limit_exceeded());
GetReadVersionReply rep;
rep.version = 1;
rep.locked = true;
req.reply.send(rep);
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
} else {
// Return error for batch_priority GRV requests
int64_t proxiesCount = std::max((int)db->get().client.proxies.size(), 1);
if (batchRateInfo->rate <= (1.0 / proxiesCount)) {
req.reply.sendError(batch_transaction_throttled());
stats->txnThrottled += req.transactionCount;
continue;
}
stats->txnBatchPriorityStartIn += req.transactionCount;
}
stats->txnStartIn += req.transactionCount;
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
if (transactionQueue->empty()) {
forwardPromise(GRVTimer, delayJittered(std::max(0.0, *GRVBatchTime - (now() - *lastGRVTime)), TaskPriority::ProxyGRVTimer));
if (systemQueue->empty() && defaultQueue->empty() && batchQueue->empty()) {
forwardPromise(GRVTimer, delayJittered(std::max(0.0, *GRVBatchTime - (now() - *lastGRVTime)), TaskPriority::ProxyGRVTimer));
}
++stats->txnRequestIn;
stats->txnStartIn += req.transactionCount;
if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
stats->txnSystemPriorityStartIn += req.transactionCount;
systemQueue->push_back(req);
} else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
stats->txnDefaultPriorityStartIn += req.transactionCount;
defaultQueue->push_back(req);
} else {
// Return error for batch_priority GRV requests
int64_t proxiesCount = std::max((int)db->get().client.proxies.size(), 1);
if (batchRateInfo->rate <= (1.0 / proxiesCount)) {
req.reply.sendError(batch_transaction_throttled());
stats->txnThrottled += req.transactionCount;
continue;
}
stats->txnBatchPriorityStartIn += req.transactionCount;
batchQueue->push_back(req);
}
}
transactionQueue->push(std::make_pair(req, counter--));
}
// dynamic batching monitors reply latencies
when(double reply_latency = waitNext(replyTimes)) {
@ -453,10 +473,12 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
while(!timeout.isReady() && !(batch.size() == SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_COUNT_MAX || batchBytes >= desiredBytes)) {
choose{
when(CommitTransactionRequest req = waitNext(in)) {
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
int bytes = getBytes(req);
// Drop requests if memory is under severe pressure
if(commitData->commitBatchesMemBytesCount + bytes > memBytesLimit) {
++commitData->stats.txnCommitErrors;
req.reply.sendError(proxy_memory_limit_exceeded());
TraceEvent(SevWarnAlways, "ProxyCommitBatchMemoryThresholdExceeded").suppressFor(60).detail("MemBytesCount", commitData->commitBatchesMemBytesCount).detail("MemLimit", memBytesLimit);
continue;
@ -623,6 +645,7 @@ ACTOR Future<Void> commitBatch(
vector<CommitTransactionRequest> trs,
int currentBatchMemBytesCount)
{
//WARNING: this code is run at a high priority (until the first delay(0)), so it needs to do as little work as possible
state int64_t localBatchNumber = ++self->localCommitBatchesStarted;
state LogPushData toCommit(self->logSystem);
state double t1 = now();
@ -1255,6 +1278,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
} else {
request.reply.send(reply);
}
++stats->txnRequestOut;
}
return Void();
@ -1276,12 +1300,14 @@ ACTOR static Future<Void> transactionStarter(
state TransactionRateInfo normalRateInfo(10);
state TransactionRateInfo batchRateInfo(0);
state std::priority_queue<std::pair<GetReadVersionRequest, int64_t>, std::vector<std::pair<GetReadVersionRequest, int64_t>>> transactionQueue;
state Deque<GetReadVersionRequest> systemQueue;
state Deque<GetReadVersionRequest> defaultQueue;
state Deque<GetReadVersionRequest> batchQueue;
state vector<MasterProxyInterface> otherProxies;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(queueTransactionStartRequests(db, &transactionQueue, proxy.getConsistentReadVersion.getFuture(),
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(),
&commitData->stats, &batchRateInfo));
@ -1318,8 +1344,20 @@ ACTOR static Future<Void> transactionStarter(
Optional<UID> debugID;
int requestsToStart = 0;
while (!transactionQueue.empty() && requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
auto& req = transactionQueue.top().first;
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
Deque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
transactionQueue = &systemQueue;
} else if(!defaultQueue.empty()) {
transactionQueue = &defaultQueue;
} else if(!batchQueue.empty()) {
transactionQueue = &batchQueue;
} else {
break;
}
auto& req = transactionQueue->front();
int tc = req.transactionCount;
if (req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT &&
@ -1344,12 +1382,13 @@ ACTOR static Future<Void> transactionStarter(
batchPriTransactionsStarted[req.flags & 1] += tc;
start[req.flags & 1].push_back(std::move(req)); static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
transactionQueue.pop();
transactionQueue->pop_front();
requestsToStart++;
}
if (!transactionQueue.empty())
if (!systemQueue.empty() || !defaultQueue.empty() || !batchQueue.empty()) {
forwardPromise(GRVTimer, delayJittered(SERVER_KNOBS->START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, TaskPriority::ProxyGRVTimer));
}
/*TraceEvent("GRVBatch", proxy.id())
.detail("Elapsed", elapsed)
@ -1388,55 +1427,66 @@ ACTOR static Future<Void> transactionStarter(
}
}
ACTOR static Future<Void> readRequestServer( MasterProxyInterface proxy, ProxyCommitData* commitData ) {
// Implement read-only parts of the proxy interface
ACTOR static Future<Void> doKeyServerLocationRequest( GetKeyServerLocationsRequest req, ProxyCommitData* commitData ) {
// We can't respond to these requests until we have valid txnStateStore
wait(commitData->validState.getFuture());
wait(delay(0, TaskPriority::DefaultEndpoint));
TraceEvent("ProxyReadyForReads", proxy.id());
loop {
GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture());
++commitData->stats.keyServerLocationRequests;
GetKeyServerLocationsReply rep;
if(!req.end.present()) {
auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin) : commitData->keyInfo.rangeContaining(req.begin);
GetKeyServerLocationsReply rep;
if(!req.end.present()) {
auto r = req.reverse ? commitData->keyInfo.rangeContainingKeyBefore(req.begin) : commitData->keyInfo.rangeContaining(req.begin);
vector<StorageServerInterface> ssis;
ssis.reserve(r.value().src_info.size());
for(auto& it : r.value().src_info) {
ssis.push_back(it->interf);
}
rep.results.push_back(std::make_pair(r.range(), ssis));
} else if(!req.reverse) {
int count = 0;
for(auto r = commitData->keyInfo.rangeContaining(req.begin); r != commitData->keyInfo.ranges().end() && count < req.limit && r.begin() < req.end.get(); ++r) {
vector<StorageServerInterface> ssis;
ssis.reserve(r.value().src_info.size());
for(auto& it : r.value().src_info) {
ssis.push_back(it->interf);
}
rep.results.push_back(std::make_pair(r.range(), ssis));
} else if(!req.reverse) {
int count = 0;
for(auto r = commitData->keyInfo.rangeContaining(req.begin); r != commitData->keyInfo.ranges().end() && count < req.limit && r.begin() < req.end.get(); ++r) {
vector<StorageServerInterface> ssis;
ssis.reserve(r.value().src_info.size());
for(auto& it : r.value().src_info) {
ssis.push_back(it->interf);
}
rep.results.push_back(std::make_pair(r.range(), ssis));
count++;
}
} else {
int count = 0;
auto r = commitData->keyInfo.rangeContainingKeyBefore(req.end.get());
while( count < req.limit && req.begin < r.end() ) {
vector<StorageServerInterface> ssis;
ssis.reserve(r.value().src_info.size());
for(auto& it : r.value().src_info) {
ssis.push_back(it->interf);
}
rep.results.push_back(std::make_pair(r.range(), ssis));
if(r == commitData->keyInfo.ranges().begin()) {
break;
}
count++;
--r;
}
count++;
}
} else {
int count = 0;
auto r = commitData->keyInfo.rangeContainingKeyBefore(req.end.get());
while( count < req.limit && req.begin < r.end() ) {
vector<StorageServerInterface> ssis;
ssis.reserve(r.value().src_info.size());
for(auto& it : r.value().src_info) {
ssis.push_back(it->interf);
}
rep.results.push_back(std::make_pair(r.range(), ssis));
if(r == commitData->keyInfo.ranges().begin()) {
break;
}
count++;
--r;
}
}
req.reply.send(rep);
++commitData->stats.keyServerLocationOut;
return Void();
}
ACTOR static Future<Void> readRequestServer( MasterProxyInterface proxy, PromiseStream<Future<Void>> addActor, ProxyCommitData* commitData ) {
loop {
GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture());
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
if(req.limit != CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT && //Always do data distribution requests
commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() > SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE) {
++commitData->stats.keyServerLocationErrors;
req.reply.sendError(proxy_memory_limit_exceeded());
TraceEvent(SevWarnAlways, "ProxyLocationRequestThresholdExceeded").suppressFor(60);
} else {
++commitData->stats.keyServerLocationIn;
addActor.send(doKeyServerLocationRequest(req, commitData));
}
req.reply.send(rep);
wait(yield());
}
}
@ -1444,6 +1494,8 @@ ACTOR static Future<Void> rejoinServer( MasterProxyInterface proxy, ProxyCommitD
// We can't respond to these requests until we have valid txnStateStore
wait(commitData->validState.getFuture());
TraceEvent("ProxyReadyForReads", proxy.id());
loop {
GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture());
if (commitData->txnStateStore->readValue(serverListKeyFor(req.id)).get().present()) {
@ -1755,7 +1807,7 @@ ACTOR Future<Void> masterProxyServerCore(
addActor.send(monitorRemoteCommitted(&commitData));
addActor.send(transactionStarter(proxy, commitData.db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(readRequestServer(proxy, &commitData));
addActor.send(readRequestServer(proxy, addActor, &commitData));
addActor.send(rejoinServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
@ -1789,6 +1841,7 @@ ACTOR Future<Void> masterProxyServerCore(
}
when(wait(onError)) {}
when(std::pair<vector<CommitTransactionRequest>, int> batchedRequests = waitNext(batchedCommits.getFuture())) {
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
const vector<CommitTransactionRequest> &trs = batchedRequests.first;
int batchBytes = batchedRequests.second;
//TraceEvent("MasterProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount);

View File

@ -1432,7 +1432,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
state IKeyValueStore *storage = self->persistentData;
wait(storage->init());
wait( ioTimeoutError( storage->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
storage->set( persistFormat );
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(logData->version.get(), Unversioned()) ) );
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
@ -1448,7 +1448,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
}
TraceEvent("TLogInitCommit", logData->logId);
wait( self->persistentData->commit() );
wait( ioTimeoutError( self->persistentData->commit(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
return Void();
}
@ -2334,7 +2334,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
if(restoreFromDisk) {
wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
} else {
wait( checkEmptyQueue(&self) && checkRecovered(&self) );
wait( ioTimeoutError( checkEmptyQueue(&self) && checkRecovered(&self), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
}
//Disk errors need a chance to kill this actor.

View File

@ -776,6 +776,10 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
megabits_received.setKeyRawNumber("hz", processMetrics.getValue("MbpsReceived"));
networkObj["megabits_received"] = megabits_received;
JsonBuilderObject tls_policy_failures;
tls_policy_failures.setKeyRawNumber("hz", processMetrics.getValue("TLSPolicyFailures"));
networkObj["tls_policy_failures"] = tls_policy_failures;
statusObj["network"] = networkObj;
memoryObj.setKeyRawNumber("used_bytes", processMetrics.getValue("Memory"));
@ -964,8 +968,9 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(WorkerDetails
state JsonBuilderObject message;
try {
state Future<TraceEventFields> activeGens = timeoutError(mWorker.interf.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryGenerations") ) ), 1.0);
TraceEventFields md = wait( timeoutError(mWorker.interf.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = md.getInt("StatusCode");
int mStatusCode = md.getInt("StatusCode");
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -989,6 +994,12 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(WorkerDetails
// TODO: time_in_recovery: 0.5
// time_in_state: 0.1
TraceEventFields md = wait(activeGens);
if(md.size()) {
int activeGenerations = md.getInt("ActiveGenerations");
message["active_generations"] = activeGenerations;
}
} catch (Error &e){
if (e.code() == error_code_actor_cancelled)
throw;
@ -1576,7 +1587,7 @@ static int getExtraTLogEligibleZones(const vector<WorkerDetails>& workers, const
for(auto& region : configuration.regions) {
int eligible = dcId_zone[region.dcId].size() - std::max(configuration.remoteTLogReplicationFactor, std::max(configuration.tLogReplicationFactor, configuration.storageTeamSize) );
//FIXME: does not take into account fallback satellite policies
if(region.satelliteTLogReplicationFactor > 0) {
if(region.satelliteTLogReplicationFactor > 0 && configuration.usableRegions > 1) {
int totalSatelliteEligible = 0;
for(auto& sat : region.satellites) {
totalSatelliteEligible += dcId_zone[sat.dcId].size();
@ -1650,6 +1661,8 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
StatusCounter txnDefaultPriorityStartOut;
StatusCounter txnBatchPriorityStartOut;
StatusCounter txnCommitOutSuccess;
StatusCounter txnKeyLocationOut;
StatusCounter txnMemoryErrors;
for (auto &ps : proxyStats) {
mutations.updateValues( StatusCounter(ps.getValue("Mutations")) );
@ -1660,9 +1673,15 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
txnDefaultPriorityStartOut.updateValues(StatusCounter(ps.getValue("TxnDefaultPriorityStartOut")));
txnBatchPriorityStartOut.updateValues(StatusCounter(ps.getValue("TxnBatchPriorityStartOut")));
txnCommitOutSuccess.updateValues( StatusCounter(ps.getValue("TxnCommitOutSuccess")) );
txnKeyLocationOut.updateValues( StatusCounter(ps.getValue("KeyServerLocationOut")) );
txnMemoryErrors.updateValues( StatusCounter(ps.getValue("TxnRequestErrors")) );
txnMemoryErrors.updateValues( StatusCounter(ps.getValue("KeyServerLocationErrors")) );
txnMemoryErrors.updateValues( StatusCounter(ps.getValue("TxnCommitErrors")) );
}
operationsObj["writes"] = mutations.getStatus();
operationsObj["location_requests"] = txnKeyLocationOut.getStatus();
operationsObj["memory_errors"] = txnMemoryErrors.getStatus();
bytesObj["written"] = mutationBytes.getStatus();
JsonBuilderObject transactions;

View File

@ -1845,7 +1845,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
// PERSIST: Initial setup of persistentData for a brand new tLog for a new database
state IKeyValueStore *storage = self->persistentData;
wait(storage->init());
wait( ioTimeoutError( storage->init(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
storage->set( persistFormat );
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(logData->version.get(), Unversioned()) ) );
storage->set( KeyValueRef( BinaryWriter::toValue(logData->logId,Unversioned()).withPrefix(persistKnownCommittedVersionKeys.begin), BinaryWriter::toValue(logData->knownCommittedVersion, Unversioned()) ) );
@ -1863,7 +1863,7 @@ ACTOR Future<Void> initPersistentState( TLogData* self, Reference<LogData> logDa
}
TraceEvent("TLogInitCommit", logData->logId);
wait( self->persistentData->commit() );
wait( ioTimeoutError( self->persistentData->commit(), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
return Void();
}
@ -2791,7 +2791,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
if(restoreFromDisk) {
wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
} else {
wait( checkEmptyQueue(&self) && checkRecovered(&self) );
wait( ioTimeoutError( checkEmptyQueue(&self) && checkRecovered(&self), SERVER_KNOBS->TLOG_MAX_CREATE_DURATION ) );
}
//Disk errors need a chance to kill this actor.

View File

@ -2172,7 +2172,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
needsOldTxs = needsOldTxs || it.tLogs[0]->tLogVersion < TLogVersion::V4;
}
if(region.satelliteTLogReplicationFactor > 0) {
if(region.satelliteTLogReplicationFactor > 0 && configuration.usableRegions > 1) {
logSystem->tLogs.emplace_back(new LogSet());
if(recr.satelliteFallback) {
logSystem->tLogs[1]->tLogWriteAntiQuorum = region.satelliteTLogWriteAntiQuorumFallback;
@ -2322,7 +2322,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state std::vector<Future<Void>> recoveryComplete;
if(region.satelliteTLogReplicationFactor > 0) {
if(region.satelliteTLogReplicationFactor > 0 && configuration.usableRegions > 1) {
state vector<Future<TLogInterface>> satelliteInitializationReplies;
vector< InitializeTLogRequest > sreqs( recr.satelliteTLogs.size() );
std::vector<Tag> satelliteTags;

View File

@ -1172,6 +1172,10 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
.detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.trackLatest("MasterRecoveryState");
TraceEvent("MasterRecoveryGenerations", self->dbgid)
.detail("ActiveGenerations", 1)
.trackLatest("MasterRecoveryGenerations");
} else if( !newState.oldTLogData.size() && self->recoveryState < RecoveryState::STORAGE_RECOVERED ) {
self->recoveryState = RecoveryState::STORAGE_RECOVERED;
TraceEvent("MasterRecoveryState", self->dbgid)
@ -1324,6 +1328,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
.detail("StatusCode", RecoveryStatus::locking_coordinated_state)
.detail("Status", RecoveryStatus::names[RecoveryStatus::locking_coordinated_state])
.detail("TLogs", self->cstate.prevDBState.tLogs.size())
.detail("ActiveGenerations", self->cstate.myDBState.oldTLogData.size() + 1)
.detail("MyRecoveryCount", self->cstate.prevDBState.recoveryCount+2)
.detail("ForceRecovery", self->forceRecovery)
.trackLatest("MasterRecoveryState");
@ -1331,6 +1336,22 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
// TraceEvent("BWReadCoreState", self->dbgid).detail("Epoch", old.epoch).detail("Version", old.epochEnd);
//}
TraceEvent("MasterRecoveryGenerations", self->dbgid)
.detail("ActiveGenerations", self->cstate.myDBState.oldTLogData.size() + 1)
.trackLatest("MasterRecoveryGenerations");
if (self->cstate.myDBState.oldTLogData.size() > CLIENT_KNOBS->MAX_GENERATIONS_OVERRIDE) {
if (self->cstate.myDBState.oldTLogData.size() >= CLIENT_KNOBS->MAX_GENERATIONS) {
TraceEvent(SevError, "RecoveryStoppedTooManyOldGenerations").detail("OldGenerations", self->cstate.myDBState.oldTLogData.size())
.detail("Reason", "Recovery stopped because too many recoveries have happened since the last time the cluster was fully_recovered. Set --knob_max_generations_override on your server processes to a value larger than OldGenerations to resume recovery once the underlying problem has been fixed.");
wait(Future<Void>(Never()));
} else if (self->cstate.myDBState.oldTLogData.size() > CLIENT_KNOBS->RECOVERY_DELAY_START_GENERATION) {
TraceEvent(SevError, "RecoveryDelayedTooManyOldGenerations").detail("OldGenerations", self->cstate.myDBState.oldTLogData.size())
.detail("Reason", "Recovery is delayed because too many recoveries have happened since the last time the cluster was fully_recovered. Set --knob_max_generations_override on your server processes to a value larger than OldGenerations to resume recovery once the underlying problem has been fixed.");
wait(delay(CLIENT_KNOBS->RECOVERY_DELAY_SECONDS_PER_GENERATION*(self->cstate.myDBState.oldTLogData.size() - CLIENT_KNOBS->RECOVERY_DELAY_START_GENERATION)));
}
}
state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> );
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->cstate.prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality, &self->forceRecovery);

View File

@ -184,9 +184,11 @@ ArenaBlock* ArenaBlock::create(int dataSize, Reference<ArenaBlock>& next) {
b->bigSize = reqSize;
b->bigUsed = sizeof(ArenaBlock);
if (FLOW_KNOBS && g_trace_depth == 0 &&
if (FLOW_KNOBS && g_allocation_tracing_disabled == 0 &&
nondeterministicRandom()->random01() < (reqSize / FLOW_KNOBS->HUGE_ARENA_LOGGING_BYTES)) {
++g_allocation_tracing_disabled;
hugeArenaSample(reqSize);
--g_allocation_tracing_disabled;
}
g_hugeArenaMemory.fetch_add(reqSize);

View File

@ -457,8 +457,10 @@ void FastAllocator<Size>::getMagazine() {
// FIXME: We should be able to allocate larger magazine sizes here if we
// detect that the underlying system supports hugepages. Using hugepages
// with smaller-than-2MiB magazine sizes strands memory. See issue #909.
if(FLOW_KNOBS && g_trace_depth == 0 && nondeterministicRandom()->random01() < (magazine_size * Size)/FLOW_KNOBS->FAST_ALLOC_LOGGING_BYTES) {
if(FLOW_KNOBS && g_allocation_tracing_disabled == 0 && nondeterministicRandom()->random01() < (magazine_size * Size)/FLOW_KNOBS->FAST_ALLOC_LOGGING_BYTES) {
++g_allocation_tracing_disabled;
TraceEvent("GetMagazineSample").detail("Size", Size).backtrace();
--g_allocation_tracing_disabled;
}
block = (void **)::allocate(magazine_size * Size, false);
#endif

View File

@ -162,6 +162,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( TRACE_EVENT_THROTTLER_MSG_LIMIT, 20000 );
init( MAX_TRACE_FIELD_LENGTH, 495 ); // If the value of this is changed, the corresponding default in Trace.cpp should be changed as well
init( MAX_TRACE_EVENT_LENGTH, 4000 ); // If the value of this is changed, the corresponding default in Trace.cpp should be changed as well
init( ALLOCATION_TRACING_ENABLED, true );
//TDMetrics
init( MAX_METRICS, 600 );

View File

@ -183,6 +183,7 @@ public:
int TRACE_EVENT_THROTTLER_MSG_LIMIT;
int MAX_TRACE_FIELD_LENGTH;
int MAX_TRACE_EVENT_LENGTH;
bool ALLOCATION_TRACING_ENABLED;
//TDMetrics
int64_t MAX_METRIC_SIZE;

View File

@ -156,6 +156,8 @@ public:
virtual void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; }
std::vector<flowGlobalType> globals;
virtual const TLSConfig& getTLSConfig() { return tlsConfig; }
bool useThreadPool;
//private:
@ -220,6 +222,7 @@ public:
Int64MetricHandle countYieldCallsTrue;
Int64MetricHandle countASIOEvents;
Int64MetricHandle countSlowTaskSignals;
Int64MetricHandle countTLSPolicyFailures;
Int64MetricHandle priorityMetric;
DoubleMetricHandle countLaunchTime;
DoubleMetricHandle countReactTime;
@ -853,12 +856,6 @@ struct PromiseTask : public Task, public FastAllocated<PromiseTask> {
// 5MB for loading files into memory
#ifndef TLS_DISABLED
bool insecurely_always_accept(bool _1, boost::asio::ssl::verify_context& _2) {
return true;
}
#endif
Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics)
: useThreadPool(useThreadPool),
network(this),
@ -900,47 +897,6 @@ Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics)
}
#ifndef TLS_DISABLED
void ConfigureSSLContext( const LoadedTLSConfig& loaded, boost::asio::ssl::context* context ) {
try {
context->set_options(boost::asio::ssl::context::default_workarounds);
context->set_verify_mode(boost::asio::ssl::context::verify_peer | boost::asio::ssl::verify_fail_if_no_peer_cert);
if (loaded.isTLSEnabled()) {
Reference<TLSPolicy> tlsPolicy = Reference<TLSPolicy>(new TLSPolicy(loaded.getEndpointType()));
tlsPolicy->set_verify_peers({ loaded.getVerifyPeers() });
context->set_verify_callback([policy=tlsPolicy](bool preverified, boost::asio::ssl::verify_context& ctx) {
return policy->verify_peer(preverified, ctx.native_handle());
});
} else {
context->set_verify_callback(boost::bind(&insecurely_always_accept, _1, _2));
}
context->set_password_callback(
[password=loaded.getPassword()](size_t, boost::asio::ssl::context::password_purpose) {
return password;
});
const std::string& certBytes = loaded.getCertificateBytes();
if ( certBytes.size() ) {
context->use_certificate_chain(boost::asio::buffer(certBytes.data(), certBytes.size()));
}
const std::string& CABytes = loaded.getCABytes();
if ( CABytes.size() ) {
context->add_certificate_authority(boost::asio::buffer(CABytes.data(), CABytes.size()));
}
const std::string& keyBytes = loaded.getKeyBytes();
if (keyBytes.size()) {
context->use_private_key(boost::asio::buffer(keyBytes.data(), keyBytes.size()), boost::asio::ssl::context::pem);
}
} catch (boost::system::system_error& e) {
TraceEvent("TLSConfigureError").detail("What", e.what()).detail("Value", e.code().value()).detail("WhichMeans", TLSPolicy::ErrorString(e.code()));
throw tls_error();
}
}
ACTOR static Future<Void> watchFileForChanges( std::string filename, AsyncTrigger* fileChanged ) {
if (filename == "") {
return Never();
@ -968,7 +924,7 @@ ACTOR static Future<Void> watchFileForChanges( std::string filename, AsyncTrigge
}
}
ACTOR static Future<Void> reloadCertificatesOnChange( TLSConfig config, AsyncVar<Reference<ReferencedObject<boost::asio::ssl::context>>>* contextVar ) {
ACTOR static Future<Void> reloadCertificatesOnChange( TLSConfig config, std::function<void()> onPolicyFailure, AsyncVar<Reference<ReferencedObject<boost::asio::ssl::context>>>* contextVar ) {
if (FLOW_KNOBS->TLS_CERT_REFRESH_DELAY_SECONDS <= 0) {
return Void();
}
@ -992,7 +948,7 @@ ACTOR static Future<Void> reloadCertificatesOnChange( TLSConfig config, AsyncVar
try {
LoadedTLSConfig loaded = wait( config.loadAsync() );
boost::asio::ssl::context context(boost::asio::ssl::context::tls);
ConfigureSSLContext(loaded, &context);
ConfigureSSLContext(loaded, &context, onPolicyFailure);
TraceEvent(SevInfo, "TLSCertificateRefreshSucceeded");
mismatches = 0;
contextVar->set(ReferencedObject<boost::asio::ssl::context>::from(std::move(context)));
@ -1015,9 +971,10 @@ void Net2::initTLS() {
#ifndef TLS_DISABLED
try {
boost::asio::ssl::context newContext(boost::asio::ssl::context::tls);
ConfigureSSLContext( tlsConfig.loadSync(), &newContext );
auto onPolicyFailure = [this]() { this->countTLSPolicyFailures++; };
ConfigureSSLContext( tlsConfig.loadSync(), &newContext, onPolicyFailure );
sslContextVar.set(ReferencedObject<boost::asio::ssl::context>::from(std::move(newContext)));
backgroundCertRefresh = reloadCertificatesOnChange( tlsConfig, &sslContextVar );
backgroundCertRefresh = reloadCertificatesOnChange( tlsConfig, onPolicyFailure, &sslContextVar );
} catch (Error& e) {
TraceEvent("Net2TLSInitError").error(e);
throw tls_error();
@ -1053,6 +1010,7 @@ void Net2::initMetrics() {
countASIOEvents.init(LiteralStringRef("Net2.CountASIOEvents"));
countYieldCallsTrue.init(LiteralStringRef("Net2.CountYieldCallsTrue"));
countSlowTaskSignals.init(LiteralStringRef("Net2.CountSlowTaskSignals"));
countTLSPolicyFailures.init(LiteralStringRef("Net2.CountTLSPolicyFailures"));
priorityMetric.init(LiteralStringRef("Net2.Priority"));
awakeMetric.init(LiteralStringRef("Net2.Awake"));
slowTaskMetric.init(LiteralStringRef("Net2.SlowTask"));

View File

@ -78,7 +78,10 @@ public:
void deserialize(FileIdentifier file_identifier, Items&... items) {
const uint8_t* data = static_cast<ReaderImpl*>(this)->data();
LoadContext<ReaderImpl> context(static_cast<ReaderImpl*>(this));
ASSERT(read_file_identifier(data) == file_identifier);
if(read_file_identifier(data) != file_identifier) {
TraceEvent(SevError, "MismatchedFileIdentifier").detail("Expected", file_identifier).detail("Read", read_file_identifier(data));
ASSERT(false);
}
load_members(data, context, items...);
}

View File

@ -1821,16 +1821,29 @@ bool createDirectory( std::string const& directory ) {
if ( mkdir( directory.substr(0, sep).c_str(), 0755 ) != 0 ) {
if (errno == EEXIST)
continue;
auto mkdirErrno = errno;
// check if directory already exists
// necessary due to old kernel bugs
struct stat s;
const char* dirname = directory.c_str();
if (stat(dirname, &s) != -1 && S_ISDIR(s.st_mode)) {
TraceEvent("DirectoryAlreadyExists").detail("Directory", dirname).detail("IgnoredError", mkdirErrno);
continue;
}
Error e;
if(errno == EACCES) {
if (mkdirErrno == EACCES) {
e = file_not_writable();
}
else {
} else {
e = systemErrorCodeToError();
}
TraceEvent(SevError, "CreateDirectory").detail("Directory", directory).GetLastError().error(e);
TraceEvent(SevError, "CreateDirectory")
.detail("Directory", directory)
.detailf("UnixErrorCode", "%x", errno)
.detail("UnixError", strerror(mkdirErrno))
.error(e);
throw e;
}
createdDirectory();

View File

@ -95,6 +95,6 @@ ACTOR Future<Void> traceCounters(std::string traceEventName, UID traceEventID, d
}
last_interval = now();
wait(delay(interval));
wait(delay(interval, TaskPriority::FlushTrace));
}
}

View File

@ -101,6 +101,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
.detail("ConnectionsEstablished", (double) (netData.countConnEstablished - statState->networkState.countConnEstablished) / currentStats.elapsed)
.detail("ConnectionsClosed", ((netData.countConnClosedWithError - statState->networkState.countConnClosedWithError) + (netData.countConnClosedWithoutError - statState->networkState.countConnClosedWithoutError)) / currentStats.elapsed)
.detail("ConnectionErrors", (netData.countConnClosedWithError - statState->networkState.countConnClosedWithError) / currentStats.elapsed)
.detail("TLSPolicyFailures", (netData.countTLSPolicyFailures - statState->networkState.countTLSPolicyFailures) / currentStats.elapsed)
.trackLatest(eventName);
TraceEvent("MemoryMetrics")

View File

@ -80,6 +80,7 @@ struct NetworkData {
int64_t countConnEstablished;
int64_t countConnClosedWithError;
int64_t countConnClosedWithoutError;
int64_t countTLSPolicyFailures;
double countLaunchTime;
double countReactTime;
@ -107,6 +108,7 @@ struct NetworkData {
countConnEstablished = Int64Metric::getValueOrDefault(LiteralStringRef("Net2.CountConnEstablished"));
countConnClosedWithError = Int64Metric::getValueOrDefault(LiteralStringRef("Net2.CountConnClosedWithError"));
countConnClosedWithoutError = Int64Metric::getValueOrDefault(LiteralStringRef("Net2.CountConnClosedWithoutError"));
countTLSPolicyFailures = Int64Metric::getValueOrDefault(LiteralStringRef("Net2.CountTLSPolicyFailures"));
countLaunchTime = DoubleMetric::getValueOrDefault(LiteralStringRef("Net2.CountLaunchTime"));
countReactTime = DoubleMetric::getValueOrDefault(LiteralStringRef("Net2.CountReactTime"));
countFileLogicalWrites = Int64Metric::getValueOrDefault(LiteralStringRef("AsyncFile.CountLogicalWrites"));

View File

@ -25,7 +25,14 @@
// To force typeinfo to only be emitted once.
TLSPolicy::~TLSPolicy() {}
#ifndef TLS_DISABLED
#ifdef TLS_DISABLED
void LoadedTLSConfig::print(FILE *fp) {
fprintf(fp, "Cannot print LoadedTLSConfig. TLS support is not enabled.\n");
}
#else // TLS is enabled
#include <algorithm>
#include <cstring>
#include <exception>
@ -63,7 +70,7 @@ std::vector<std::string> LoadedTLSConfig::getVerifyPeers() const {
if (tlsVerifyPeers.size()) {
return tlsVerifyPeers;
}
std::string envVerifyPeers;
if (platform::getEnvironmentVar("FDB_TLS_VERIFY_PEERS", envVerifyPeers)) {
return {envVerifyPeers};
@ -76,12 +83,90 @@ std::string LoadedTLSConfig::getPassword() const {
if (tlsPassword.size()) {
return tlsPassword;
}
std::string envPassword;
platform::getEnvironmentVar("FDB_TLS_PASSWORD", envPassword);
return envPassword;
}
void LoadedTLSConfig::print(FILE* fp) {
int num_certs = 0;
boost::asio::ssl::context context(boost::asio::ssl::context::tls);
try {
ConfigureSSLContext(*this, &context);
} catch (Error& e) {
fprintf(fp, "There was an error in loading the certificate chain.\n");
throw;
}
X509_STORE* store = SSL_CTX_get_cert_store(context.native_handle());
X509_STORE_CTX* store_ctx = X509_STORE_CTX_new();
X509* cert = SSL_CTX_get0_certificate(context.native_handle());
X509_STORE_CTX_init(store_ctx, store, cert, NULL);
X509_verify_cert(store_ctx);
STACK_OF(X509)* chain = X509_STORE_CTX_get0_chain(store_ctx);
X509_print_fp(fp, cert);
num_certs = sk_X509_num(chain);
if (num_certs) {
for ( int i = 0; i < num_certs; i++ ) {
printf("\n");
X509* cert = sk_X509_value(chain, i);
X509_print_fp(fp, cert);
}
}
X509_STORE_CTX_free(store_ctx);
}
void ConfigureSSLContext( const LoadedTLSConfig& loaded, boost::asio::ssl::context* context, std::function<void()> onPolicyFailure ) {
try {
context->set_options(boost::asio::ssl::context::default_workarounds);
context->set_verify_mode(boost::asio::ssl::context::verify_peer | boost::asio::ssl::verify_fail_if_no_peer_cert);
if (loaded.isTLSEnabled()) {
Reference<TLSPolicy> tlsPolicy = Reference<TLSPolicy>(new TLSPolicy(loaded.getEndpointType()));
tlsPolicy->set_verify_peers({ loaded.getVerifyPeers() });
context->set_verify_callback([policy=tlsPolicy, onPolicyFailure](bool preverified, boost::asio::ssl::verify_context& ctx) {
bool success = policy->verify_peer(preverified, ctx.native_handle());
if (!success) {
onPolicyFailure();
}
return success;
});
} else {
// Insecurely always except if TLS is not enabled.
context->set_verify_callback([](bool, boost::asio::ssl::verify_context&){ return true; });
}
context->set_password_callback(
[password=loaded.getPassword()](size_t, boost::asio::ssl::context::password_purpose) {
return password;
});
const std::string& CABytes = loaded.getCABytes();
if ( CABytes.size() ) {
context->add_certificate_authority(boost::asio::buffer(CABytes.data(), CABytes.size()));
}
const std::string& keyBytes = loaded.getKeyBytes();
if (keyBytes.size()) {
context->use_private_key(boost::asio::buffer(keyBytes.data(), keyBytes.size()), boost::asio::ssl::context::pem);
}
const std::string& certBytes = loaded.getCertificateBytes();
if ( certBytes.size() ) {
context->use_certificate_chain(boost::asio::buffer(certBytes.data(), certBytes.size()));
}
} catch (boost::system::system_error& e) {
TraceEvent("TLSConfigureError").detail("What", e.what()).detail("Value", e.code().value()).detail("WhichMeans", TLSPolicy::ErrorString(e.code()));
throw tls_error();
}
}
std::string TLSConfig::getCertificatePathSync() const {
if (tlsCertPath.size()) {
return tlsCertPath;
@ -96,7 +181,7 @@ std::string TLSConfig::getCertificatePathSync() const {
if( fileExists(defaultCertFileName) ) {
return defaultCertFileName;
}
if( fileExists( joinPath(platform::getDefaultConfigPath(), defaultCertFileName) ) ) {
return joinPath(platform::getDefaultConfigPath(), defaultCertFileName);
}
@ -118,7 +203,7 @@ std::string TLSConfig::getKeyPathSync() const {
if( fileExists(defaultCertFileName) ) {
return defaultCertFileName;
}
if( fileExists( joinPath(platform::getDefaultConfigPath(), defaultCertFileName) ) ) {
return joinPath(platform::getDefaultConfigPath(), defaultCertFileName);
}

View File

@ -27,6 +27,7 @@
#pragma once
#include <cstdio>
#include <map>
#include <string>
#include <vector>
@ -120,6 +121,8 @@ public:
return endpointType != TLSEndpointType::UNSET;
}
void print(FILE* fp);
PRIVATE_EXCEPT_FOR_TLSCONFIG_CPP:
std::string tlsCertBytes, tlsKeyBytes, tlsCABytes;
std::string tlsPassword;
@ -217,6 +220,11 @@ PRIVATE_EXCEPT_FOR_TLSCONFIG_CPP:
TLSEndpointType endpointType = TLSEndpointType::UNSET;
};
#ifndef TLS_DISABLED
namespace boost { namespace asio { namespace ssl { struct context; }}}
void ConfigureSSLContext(const LoadedTLSConfig& loaded, boost::asio::ssl::context* context, std::function<void()> onPolicyFailure = [](){});
#endif
class TLSPolicy : ReferenceCounted<TLSPolicy> {
public:

View File

@ -43,7 +43,15 @@
#undef min
#endif
thread_local int g_trace_depth = 0;
// Allocations can only be logged when this value is 0.
// Anybody that needs to disable tracing should increment this by 1 for the duration
// that they need the disabling to be in effect.
//
// This is done for multiple reasons:
// 1. To avoid recursion in the allocation tracing when each trace event does an allocation
// 2. To avoid a historically documented but unknown crash that occurs when logging allocations
// during an open trace event
thread_local int g_allocation_tracing_disabled = 1;
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
@ -733,14 +741,12 @@ TraceEvent& TraceEvent::operator=(TraceEvent &&ev) {
}
TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true), logged(false) {
g_trace_depth++;
setMaxFieldLength(0);
setMaxEventLength(0);
}
TraceEvent::TraceEvent( Severity severity, const char* type, UID id )
: id(id), type(type), severity(severity), initialized(false), logged(false),
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
g_trace_depth++;
setMaxFieldLength(0);
setMaxEventLength(0);
}
@ -750,7 +756,6 @@ TraceEvent::TraceEvent( TraceInterval& interval, UID id )
initialized(false), logged(false),
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= interval.severity) {
g_trace_depth++;
setMaxFieldLength(0);
setMaxEventLength(0);
@ -762,7 +767,6 @@ TraceEvent::TraceEvent( Severity severity, TraceInterval& interval, UID id )
initialized(false), logged(false),
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
g_trace_depth++;
setMaxFieldLength(0);
setMaxEventLength(0);
@ -784,9 +788,12 @@ bool TraceEvent::init() {
if(initialized) {
return enabled;
}
initialized = true;
initialized = true;
ASSERT(*type != '\0');
++g_allocation_tracing_disabled;
enabled = enabled && ( !g_network || severity >= FLOW_KNOBS->MIN_TRACE_SEVERITY );
// Backstop to throttle very spammy trace events
@ -829,6 +836,7 @@ bool TraceEvent::init() {
tmpEventMetric = nullptr;
}
--g_allocation_tracing_disabled;
return enabled;
}
@ -858,6 +866,7 @@ TraceEvent& TraceEvent::errorImpl(class Error const& error, bool includeCancelle
TraceEvent& TraceEvent::detailImpl( std::string&& key, std::string&& value, bool writeEventMetricField) {
init();
if (enabled) {
++g_allocation_tracing_disabled;
if( maxFieldLength >= 0 && value.size() > maxFieldLength ) {
value = value.substr(0, maxFieldLength) + "...";
}
@ -872,20 +881,27 @@ TraceEvent& TraceEvent::detailImpl( std::string&& key, std::string&& value, bool
TraceEvent(g_network && g_network->isSimulated() ? SevError : SevWarnAlways, "TraceEventOverflow").setMaxEventLength(1000).detail("TraceFirstBytes", fields.toString().substr(300));
enabled = false;
}
--g_allocation_tracing_disabled;
}
return *this;
}
void TraceEvent::setField(const char* key, int64_t value) {
++g_allocation_tracing_disabled;
tmpEventMetric->setField(key, value);
--g_allocation_tracing_disabled;
}
void TraceEvent::setField(const char* key, double value) {
++g_allocation_tracing_disabled;
tmpEventMetric->setField(key, value);
--g_allocation_tracing_disabled;
}
void TraceEvent::setField(const char* key, const std::string& value) {
++g_allocation_tracing_disabled;
tmpEventMetric->setField(key, Standalone<StringRef>(value));
--g_allocation_tracing_disabled;
}
TraceEvent& TraceEvent::detailf( std::string key, const char* valueFormat, ... ) {
@ -1016,6 +1032,7 @@ TraceEvent& TraceEvent::backtrace(const std::string& prefix) {
void TraceEvent::log() {
if(!logged) {
init();
++g_allocation_tracing_disabled;
try {
if (enabled) {
fields.mutate(timeIndex).second = format("%.6f", TraceEvent::getCurrentTime());
@ -1049,8 +1066,8 @@ void TraceEvent::log() {
TraceEvent(SevError, "TraceEventLoggingError").error(e,true);
}
delete tmpEventMetric;
g_trace_depth--;
logged = true;
--g_allocation_tracing_disabled;
}
}
@ -1061,8 +1078,14 @@ TraceEvent::~TraceEvent() {
thread_local bool TraceEvent::networkThread = false;
void TraceEvent::setNetworkThread() {
traceEventThrottlerCache = new TransientThresholdMetricSample<Standalone<StringRef>>(FLOW_KNOBS->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT);
networkThread = true;
if(!networkThread) {
if(FLOW_KNOBS->ALLOCATION_TRACING_ENABLED) {
--g_allocation_tracing_disabled;
}
traceEventThrottlerCache = new TransientThresholdMetricSample<Standalone<StringRef>>(FLOW_KNOBS->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT);
networkThread = true;
}
}
bool TraceEvent::isNetworkThread() {

View File

@ -43,7 +43,7 @@ inline int fastrand() {
//inline static bool TRACE_SAMPLE() { return fastrand()<16; }
inline static bool TRACE_SAMPLE() { return false; }
extern thread_local int g_trace_depth;
extern thread_local int g_allocation_tracing_disabled;
enum Severity {
SevVerbose = 0,

View File

@ -579,6 +579,7 @@ struct NotifiedQueue : private SingleCallback<T>, FastAllocated<NotifiedQueue<T>
bool isReady() const { return !queue.empty() || error.isValid(); }
bool isError() const { return queue.empty() && error.isValid(); } // the *next* thing queued is an error
uint32_t size() const { return queue.size(); }
T pop() {
if (queue.empty()) {

View File

@ -199,7 +199,6 @@ Future<T> timeoutError( Future<T> what, double time, TaskPriority taskID = TaskP
}
}
ACTOR template <class T>
Future<T> delayed( Future<T> what, double time = 0.0, TaskPriority taskID = TaskPriority::DefaultDelay ) {
try {
@ -856,6 +855,22 @@ Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double co
Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
Future<Void> lowPriorityDelay( double const& waitTime );
ACTOR template <class T>
Future<T> ioTimeoutError( Future<T> what, double time ) {
Future<Void> end = lowPriorityDelay( time );
choose {
when( T t = wait( what ) ) { return t; }
when( wait( end ) ) {
Error err = io_timeout();
if(g_network->isSimulated()) {
err = err.asInjectedFault();
}
TraceEvent(SevError, "IoTimeoutError").error(err);
throw err;
}
}
}
ACTOR template <class T>
Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input ) {
try {

View File

@ -56,7 +56,6 @@ enum class TaskPriority {
ClusterController = 8650,
MasterTLogRejoin = 8646,
ProxyStorageRejoin = 8645,
ProxyCommitDispatcher = 8640,
TLogQueuingMetrics = 8620,
TLogPop = 8610,
TLogPeekReply = 8600,
@ -74,7 +73,7 @@ enum class TaskPriority {
TLogConfirmRunningReply = 8530,
TLogConfirmRunning = 8520,
ProxyGRVTimer = 8510,
ProxyGetConsistentReadVersion = 8500,
GetConsistentReadVersion = 8500,
DefaultPromiseEndpoint = 8000,
DefaultOnMainThread = 7500,
DefaultDelay = 7010,
@ -491,6 +490,9 @@ public:
virtual void initTLS() {}
// TLS must be initialized before using the network
virtual const TLSConfig& getTLSConfig() = 0;
// Return the TLS Configuration
virtual void getDiskBytes( std::string const& directory, int64_t& free, int64_t& total) = 0;
//Gets the number of free and total bytes available on the disk which contains directory

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{3742289A-DBB1-4931-B01E-45C5BBB689F0}'
Id='{F7603B3D-766D-4C5C-906E-4F1FD3BEF455}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'