Merge branch 'release-6.0' of github.com:apple/foundationdb

# Conflicts:
#	fdbrpc/TLSConnection.actor.cpp
#	versions.target
This commit is contained in:
A.J. Beamon 2018-08-10 14:31:58 -07:00
commit 574c5576a2
26 changed files with 310 additions and 224 deletions

View File

@ -85,12 +85,12 @@ FDBLibTLSSession::FDBLibTLSSession(Reference<FDBLibTLSPolicy> policy, bool is_cl
throw std::runtime_error("FDBLibTLSServerError"); throw std::runtime_error("FDBLibTLSServerError");
} }
if (tls_configure(tls_sctx, policy->tls_cfg) == -1) { if (tls_configure(tls_sctx, policy->tls_cfg) == -1) {
TraceEvent(SevError, "FDBLibTLSConfigureError", uid).detail("LibTLSErrorMessage", tls_error(tls_ctx)); TraceEvent(SevError, "FDBLibTLSConfigureError", uid).detail("LibTLSErrorMessage", tls_error(tls_sctx));
tls_free(tls_sctx); tls_free(tls_sctx);
throw std::runtime_error("FDBLibTLSConfigureError"); throw std::runtime_error("FDBLibTLSConfigureError");
} }
if (tls_accept_cbs(tls_sctx, &tls_ctx, tls_read_func, tls_write_func, this) == -1) { if (tls_accept_cbs(tls_sctx, &tls_ctx, tls_read_func, tls_write_func, this) == -1) {
TraceEvent(SevError, "FDBLibTLSAcceptError", uid).detail("LibTLSErrorMessage", tls_error(tls_ctx)); TraceEvent(SevError, "FDBLibTLSAcceptError", uid).detail("LibTLSErrorMessage", tls_error(tls_sctx));
tls_free(tls_sctx); tls_free(tls_sctx);
throw std::runtime_error("FDBLibTLSAcceptError"); throw std::runtime_error("FDBLibTLSAcceptError");
} }

View File

@ -420,11 +420,35 @@
"total_disk_used_bytes":0, "total_disk_used_bytes":0,
"total_kv_size_bytes":0, "total_kv_size_bytes":0,
"partitions_count":2, "partitions_count":2,
"moving_data":{ "moving_data":{
"total_written_bytes":0, "total_written_bytes":0,
"in_flight_bytes":0, "in_flight_bytes":0,
"in_queue_bytes":0 "in_queue_bytes":0,
"highest_priority":0
}, },
"team_trackers":[
{
"primary":true,
"in_flight_bytes":0,
"unhealthy_servers":0,
"state":{
"healthy":true,
"min_replicas_remaining":0,
"name":{
"$enum":[
"initializing",
"missing_data",
"healing",
"healthy_repartitioning",
"healthy_removing_server",
"healthy_rebalancing",
"healthy"
]
},
"description":""
}
}
],
"least_operating_space_bytes_storage_server":0, "least_operating_space_bytes_storage_server":0,
"max_machine_failures_without_losing_data":0 "max_machine_failures_without_losing_data":0
}, },

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server. The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.0.4.pkg <https://www.foundationdb.org/downloads/6.0.4/macOS/installers/FoundationDB-6.0.4.pkg>`_ * `FoundationDB-6.0.5.pkg <https://www.foundationdb.org/downloads/6.0.5/macOS/installers/FoundationDB-6.0.5.pkg>`_
Ubuntu Ubuntu
------ ------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x. The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.0.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.0.4/ubuntu/installers/foundationdb-clients_6.0.4-1_amd64.deb>`_ * `foundationdb-clients-6.0.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.0.5/ubuntu/installers/foundationdb-clients_6.0.5-1_amd64.deb>`_
* `foundationdb-server-6.0.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.0.4/ubuntu/installers/foundationdb-server_6.0.4-1_amd64.deb>`_ (depends on the clients package) * `foundationdb-server-6.0.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.0.5/ubuntu/installers/foundationdb-server_6.0.5-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6 RHEL/CentOS EL6
--------------- ---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x. The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.0.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.4/rhel6/installers/foundationdb-clients-6.0.4-1.el6.x86_64.rpm>`_ * `foundationdb-clients-6.0.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.5/rhel6/installers/foundationdb-clients-6.0.5-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.0.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.4/rhel6/installers/foundationdb-server-6.0.4-1.el6.x86_64.rpm>`_ (depends on the clients package) * `foundationdb-server-6.0.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.5/rhel6/installers/foundationdb-server-6.0.5-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7 RHEL/CentOS EL7
--------------- ---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x. The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.0.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.4/rhel7/installers/foundationdb-clients-6.0.4-1.el7.x86_64.rpm>`_ * `foundationdb-clients-6.0.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.5/rhel7/installers/foundationdb-clients-6.0.5-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.0.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.4/rhel7/installers/foundationdb-server-6.0.4-1.el7.x86_64.rpm>`_ (depends on the clients package) * `foundationdb-server-6.0.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.0.5/rhel7/installers/foundationdb-server-6.0.5-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows Windows
------- -------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server. The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.0.4-x64.msi <https://www.foundationdb.org/downloads/6.0.4/windows/installers/foundationdb-6.0.4-x64.msi>`_ * `foundationdb-6.0.5-x64.msi <https://www.foundationdb.org/downloads/6.0.5/windows/installers/foundationdb-6.0.5-x64.msi>`_
API Language Bindings API Language Bindings
===================== =====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package: If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.0.4.tar.gz <https://www.foundationdb.org/downloads/6.0.4/bindings/python/foundationdb-6.0.4.tar.gz>`_ * `foundationdb-6.0.5.tar.gz <https://www.foundationdb.org/downloads/6.0.5/bindings/python/foundationdb-6.0.5.tar.gz>`_
Ruby 1.9.3/2.0.0+ Ruby 1.9.3/2.0.0+
----------------- -----------------
* `fdb-6.0.4.gem <https://www.foundationdb.org/downloads/6.0.4/bindings/ruby/fdb-6.0.4.gem>`_ * `fdb-6.0.5.gem <https://www.foundationdb.org/downloads/6.0.5/bindings/ruby/fdb-6.0.5.gem>`_
Java 8+ Java 8+
------- -------
* `fdb-java-6.0.4.jar <https://www.foundationdb.org/downloads/6.0.4/bindings/java/fdb-java-6.0.4.jar>`_ * `fdb-java-6.0.5.jar <https://www.foundationdb.org/downloads/6.0.5/bindings/java/fdb-java-6.0.5.jar>`_
* `fdb-java-6.0.4-javadoc.jar <https://www.foundationdb.org/downloads/6.0.4/bindings/java/fdb-java-6.0.4-javadoc.jar>`_ * `fdb-java-6.0.5-javadoc.jar <https://www.foundationdb.org/downloads/6.0.5/bindings/java/fdb-java-6.0.5-javadoc.jar>`_
Go 1.1+ Go 1.1+
------- -------

View File

@ -27,6 +27,7 @@ Fixes
----- -----
* A client could fail to connect to a cluster when the cluster was upgraded to a version compatible with the client. This affected upgrades that were using the multi-version client to maintain compatibility with both versions of the cluster. `(PR #637) <https://github.com/apple/foundationdb/pull/637>`_ * A client could fail to connect to a cluster when the cluster was upgraded to a version compatible with the client. This affected upgrades that were using the multi-version client to maintain compatibility with both versions of the cluster. `(PR #637) <https://github.com/apple/foundationdb/pull/637>`_
* Incorrect accounting of incompatible connections led to occasional assertion failures. `(PR #637) <https://github.com/apple/foundationdb/pull/637>`_
5.2.6 5.2.6
===== =====

View File

@ -2,7 +2,7 @@
Release Notes Release Notes
############# #############
6.0.4 6.0.5
===== =====
Features Features
@ -13,6 +13,7 @@ Features
* The TLS plugin is now statically linked into the client and server binaries and no longer requires a separate library. `(Issue #436) <https://github.com/apple/foundationdb/issues/436>`_ * The TLS plugin is now statically linked into the client and server binaries and no longer requires a separate library. `(Issue #436) <https://github.com/apple/foundationdb/issues/436>`_
* TLS peer verification now supports verifiying on Subject Alternative Name. `(Issue #514) <https://github.com/apple/foundationdb/issues/514>`_ * TLS peer verification now supports verifiying on Subject Alternative Name. `(Issue #514) <https://github.com/apple/foundationdb/issues/514>`_
* TLS peer verification now supports suffix matching by field. `(Issue #515) <https://github.com/apple/foundationdb/issues/515>`_ * TLS peer verification now supports suffix matching by field. `(Issue #515) <https://github.com/apple/foundationdb/issues/515>`_
* TLS certificates are automatically reloaded after being updated. [6.0.5] `(Issue #505) <https://github.com/apple/foundationdb/issues/505>`_
Performance Performance
----------- -----------
@ -23,6 +24,7 @@ Performance
* Clients optimistically assume the first leader reply from a coordinator is correct. `(PR #425) <https://github.com/apple/foundationdb/pull/425>`_ * Clients optimistically assume the first leader reply from a coordinator is correct. `(PR #425) <https://github.com/apple/foundationdb/pull/425>`_
* Network connections are now closed after no interface needs the connection. [6.0.1] `(Issue #375) <https://github.com/apple/foundationdb/issues/375>`_ * Network connections are now closed after no interface needs the connection. [6.0.1] `(Issue #375) <https://github.com/apple/foundationdb/issues/375>`_
* Significantly improved the CPU efficiency of copy mutations to transaction logs during recovery. [6.0.2] `(PR #595) <https://github.com/apple/foundationdb/pull/595>`_ * Significantly improved the CPU efficiency of copy mutations to transaction logs during recovery. [6.0.2] `(PR #595) <https://github.com/apple/foundationdb/pull/595>`_
* A cluster configured with usable_regions=2 did not limit the rate at which it could copy data from the primary DC to the remote DC. This caused poor performance when recovering from a DC outage. [6.0.5] `(PR #673) <https://github.com/apple/foundationdb/pull/673>`_
Fixes Fixes
----- -----
@ -36,12 +38,19 @@ Fixes
* A client could fail to connect to a cluster when the cluster was upgraded to a version compatible with the client. This affected upgrades that were using the multi-version client to maintain compatibility with both versions of the cluster. [6.0.4] `(PR #637) <https://github.com/apple/foundationdb/pull/637>`_ * A client could fail to connect to a cluster when the cluster was upgraded to a version compatible with the client. This affected upgrades that were using the multi-version client to maintain compatibility with both versions of the cluster. [6.0.4] `(PR #637) <https://github.com/apple/foundationdb/pull/637>`_
* A large number of concurrent read attempts could bring the database down after a cluster reboot. [6.0.4] `(PR #650) <https://github.com/apple/foundationdb/pull/650>`_ * A large number of concurrent read attempts could bring the database down after a cluster reboot. [6.0.4] `(PR #650) <https://github.com/apple/foundationdb/pull/650>`_
* Automatic suppression of trace events which occur too frequently was happening before trace events were suppressed by other mechanisms. [6.0.4] `(PR #656) <https://github.com/apple/foundationdb/pull/656>`_ * Automatic suppression of trace events which occur too frequently was happening before trace events were suppressed by other mechanisms. [6.0.4] `(PR #656) <https://github.com/apple/foundationdb/pull/656>`_
* After a recovery, the rate at which transaction logs made mutations durable to disk was around 5 times slower than normal. [6.0.5] `(PR #666) <https://github.com/apple/foundationdb/pull/666>`_
* Clusters configured to use TLS could get stuck spending all of their CPU opening new connections. [6.0.5] `(PR #666) <https://github.com/apple/foundationdb/pull/666>`_
* Configuring usable_regions=2 on a cluster with a large amount of data caused commits to pause for a few seconds. [6.0.5] `(PR #687) <https://github.com/apple/foundationdb/pull/687>`_
* On clusters configured with usable_regions=2, status reported no replicas remaining when the primary DC was still healthy. [6.0.5] `(PR #687) <https://github.com/apple/foundationdb/pull/687>`_
* Clients could crash when passing in TLS options. [6.0.5] `(PR #649) <https://github.com/apple/foundationdb/pull/649>`_
* A mismatched TLS certificate and key set could cause the server to crash. [6.0.5] `(PR #689) <https://github.com/apple/foundationdb/pull/689>`_
Status Status
------ ------
* The replication factor in status JSON is stored under "redundancy_mode" instead of "redundancy":"factor". `(PR #492) <https://github.com/apple/foundationdb/pull/492>`_ * The replication factor in status JSON is stored under ``redundancy_mode`` instead of ``redundancy.factor``. `(PR #492) <https://github.com/apple/foundationdb/pull/492>`_
* Additional metrics for storage server lag as well as the number of watches and mutation count have been added and are exposed through status. `(PR #521) <https://github.com/apple/foundationdb/pull/521>`_ * The metric ``data_version_lag`` has been replaced by ``data_lag.versions`` and ``data_lag.seconds``. `(PR #521) <https://github.com/apple/foundationdb/pull/521>`_
* Additional metrics for the number of watches and mutation count have been added and are exposed through status. `(PR #521) <https://github.com/apple/foundationdb/pull/521>`_
Bindings Bindings
@ -56,6 +65,7 @@ Other Changes
* Does not support upgrades from any version older than 5.0. * Does not support upgrades from any version older than 5.0.
* Normalized the capitalization of trace event names and attributes. `(PR #455) <https://github.com/apple/foundationdb/pull/455>`_ * Normalized the capitalization of trace event names and attributes. `(PR #455) <https://github.com/apple/foundationdb/pull/455>`_
* Increased the memory requirements of the transaction log by 400MB. [6.0.5] `(PR #673) <https://github.com/apple/foundationdb/pull/673>`_
Earlier release notes Earlier release notes
--------------------- ---------------------

View File

@ -168,7 +168,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( BLOBSTORE_MAX_SEND_BYTES_PER_SECOND, 1e9 ); init( BLOBSTORE_MAX_SEND_BYTES_PER_SECOND, 1e9 );
init( BLOBSTORE_MAX_RECV_BYTES_PER_SECOND, 1e9 ); init( BLOBSTORE_MAX_RECV_BYTES_PER_SECOND, 1e9 );
init( BLOBSTORE_LIST_REQUESTS_PER_SECOND, 25 ); init( BLOBSTORE_LIST_REQUESTS_PER_SECOND, 200 );
init( BLOBSTORE_WRITE_REQUESTS_PER_SECOND, 50 ); init( BLOBSTORE_WRITE_REQUESTS_PER_SECOND, 50 );
init( BLOBSTORE_READ_REQUESTS_PER_SECOND, 100 ); init( BLOBSTORE_READ_REQUESTS_PER_SECOND, 100 );
init( BLOBSTORE_DELETE_REQUESTS_PER_SECOND, 200 ); init( BLOBSTORE_DELETE_REQUESTS_PER_SECOND, 200 );

View File

@ -910,12 +910,13 @@ void MultiVersionApi::runOnExternalClients(std::function<void(Reference<ClientIn
} }
} }
catch(Error &e) { catch(Error &e) {
TraceEvent(SevWarnAlways, "ExternalClientFailure").error(e).detail("LibPath", c->second->libPath);
if(e.code() == error_code_external_client_already_loaded) { if(e.code() == error_code_external_client_already_loaded) {
TraceEvent(SevInfo, "ExternalClientAlreadyLoaded").error(e).detail("LibPath", c->second->libPath);
c = externalClients.erase(c); c = externalClients.erase(c);
continue; continue;
} }
else { else {
TraceEvent(SevWarnAlways, "ExternalClientFailure").error(e).detail("LibPath", c->second->libPath);
c->second->failed = true; c->second->failed = true;
newFailure = true; newFailure = true;
} }

View File

@ -174,7 +174,7 @@ TLSNetworkConnections::TLSNetworkConnections( Reference<TLSOptions> options ) :
Future<Reference<IConnection>> TLSNetworkConnections::connect( NetworkAddress toAddr, std::string host) { Future<Reference<IConnection>> TLSNetworkConnections::connect( NetworkAddress toAddr, std::string host) {
if ( toAddr.isTLS() ) { if ( toAddr.isTLS() ) {
NetworkAddress clearAddr( toAddr.ip, toAddr.port, toAddr.isPublic(), false ); NetworkAddress clearAddr( toAddr.ip, toAddr.port, toAddr.isPublic(), false );
TraceEvent("TLSConnectionConnecting").detail("ToAddr", toAddr); TraceEvent("TLSConnectionConnecting").suppressFor(1.0).detail("ToAddr", toAddr);
// For FDB<->FDB connections, we don't have hostnames and can't verify IP // For FDB<->FDB connections, we don't have hostnames and can't verify IP
// addresses against certificates, so we have our own peer verifying logic // addresses against certificates, so we have our own peer verifying logic
// to use. For FDB<->external system connections, we can use the standard // to use. For FDB<->external system connections, we can use the standard
@ -205,11 +205,11 @@ Reference<IListener> TLSNetworkConnections::listen( NetworkAddress localAddr ) {
void TLSOptions::set_cert_file( std::string const& cert_file ) { void TLSOptions::set_cert_file( std::string const& cert_file ) {
try { try {
TraceEvent("TLSConnectionSettingCertFile").detail("CertFilePath", cert_file); TraceEvent("TLSConnectionSettingCertFile").suppressFor(1.0).detail("CertFilePath", cert_file);
policyInfo.cert_path = cert_file; policyInfo.cert_path = cert_file;
set_cert_data( readFileBytes( cert_file, CERT_FILE_MAX_SIZE ) ); set_cert_data( readFileBytes( cert_file, CERT_FILE_MAX_SIZE ) );
} catch ( Error& ) { } catch ( Error& ) {
TraceEvent(SevError, "TLSOptionsSetCertFileError").detail("Filename", cert_file); TraceEvent(SevError, "TLSOptionsSetCertFileError").suppressFor(1.0).detail("Filename", cert_file);
throw; throw;
} }
} }
@ -437,12 +437,12 @@ Reference<ITLSPolicy> TLSOptions::get_policy(PolicyType type) {
void TLSOptions::init_plugin() { void TLSOptions::init_plugin() {
TraceEvent("TLSConnectionLoadingPlugin").detail("Plugin", tlsPluginName); TraceEvent("TLSConnectionLoadingPlugin").suppressFor(1.0).detail("Plugin", tlsPluginName);
plugin = loadPlugin<ITLSPlugin>( tlsPluginName ); plugin = loadPlugin<ITLSPlugin>( tlsPluginName );
if ( !plugin ) { if ( !plugin ) {
TraceEvent(SevError, "TLSConnectionPluginInitError").detail("Plugin", tlsPluginName).GetLastError(); TraceEvent(SevError, "TLSConnectionPluginInitError").suppressFor(1.0).detail("Plugin", tlsPluginName).GetLastError();
throw tls_error(); throw tls_error();
} }

View File

@ -2066,7 +2066,7 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state Optional<TLogInterface> primaryLog; state Optional<TLogInterface> primaryLog;
state Optional<TLogInterface> remoteLog; state Optional<TLogInterface> remoteLog;
if(self->db.serverInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED) { if(self->db.serverInfo->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) { for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.locality != tagLocalitySatellite) { if(logSet.isLocal && logSet.locality != tagLocalitySatellite) {
for(auto& tLog : logSet.tLogs) { for(auto& tLog : logSet.tLogs) {

View File

@ -204,10 +204,11 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
state std::set<LeaderInfo> availableCandidates; state std::set<LeaderInfo> availableCandidates;
state std::set<LeaderInfo> availableLeaders; state std::set<LeaderInfo> availableLeaders;
state Optional<LeaderInfo> currentNominee; state Optional<LeaderInfo> currentNominee;
state vector<ReplyPromise<Optional<LeaderInfo>>> notify; state Deque<ReplyPromise<Optional<LeaderInfo>>> notify;
state Future<Void> nextInterval = delay( 0 ); state Future<Void> nextInterval = delay( 0 );
state double candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY; state double candidateDelay = SERVER_KNOBS->CANDIDATE_MIN_DELAY;
state int leaderIntervalCount = 0; state int leaderIntervalCount = 0;
state Future<Void> notifyCheck = delay(SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / SERVER_KNOBS->MIN_NOTIFICATIONS);
loop choose { loop choose {
when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) { when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
@ -301,6 +302,13 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
availableCandidates.clear(); availableCandidates.clear();
} }
} }
when( Void _ = wait(notifyCheck) ) {
notifyCheck = delay( SERVER_KNOBS->NOTIFICATION_FULL_CLEAR_TIME / std::max<double>(SERVER_KNOBS->MIN_NOTIFICATIONS, notify.size()) );
if(!notify.empty() && currentNominee.present()) {
notify.front().send( currentNominee.get() );
notify.pop_front();
}
}
} }
} }

View File

@ -520,6 +520,7 @@ struct DDTeamCollection {
vector<UID> allServers; vector<UID> allServers;
ServerStatusMap server_status; ServerStatusMap server_status;
int64_t unhealthyServers; int64_t unhealthyServers;
std::map<int,int> priority_teams;
std::map<UID, Reference<TCServerInfo>> server_info; std::map<UID, Reference<TCServerInfo>> server_info;
vector<Reference<TCTeamInfo>> teams; vector<Reference<TCTeamInfo>> teams;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure; Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
@ -1276,6 +1277,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
Void _ = wait( yield() ); Void _ = wait( yield() );
TraceEvent("TeamTrackerStarting", self->masterId).detail("Reason", "Initial wait complete (sc)").detail("Team", team->getDesc()); TraceEvent("TeamTrackerStarting", self->masterId).detail("Reason", "Initial wait complete (sc)").detail("Team", team->getDesc());
self->priority_teams[team->getPriority()]++;
try { try {
loop { loop {
@ -1370,6 +1372,12 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
team->setPriority( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER ); team->setPriority( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER );
else else
team->setPriority( PRIORITY_TEAM_HEALTHY ); team->setPriority( PRIORITY_TEAM_HEALTHY );
if(lastPriority != team->getPriority()) {
self->priority_teams[lastPriority]--;
self->priority_teams[team->getPriority()]++;
}
TraceEvent("TeamPriorityChange", self->masterId).detail("Priority", team->getPriority()); TraceEvent("TeamPriorityChange", self->masterId).detail("Priority", team->getPriority());
lastZeroHealthy = self->zeroHealthyTeams->get(); //set this again in case it changed from this teams health changing lastZeroHealthy = self->zeroHealthyTeams->get(); //set this again in case it changed from this teams health changing
@ -1378,23 +1386,25 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
for(int i=0; i<shards.size(); i++) { for(int i=0; i<shards.size(); i++) {
int maxPriority = team->getPriority(); int maxPriority = team->getPriority();
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] ); if(maxPriority < PRIORITY_TEAM_0_LEFT) {
for( int t=0; t<teams.size(); t++) { auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
if( teams[t].servers.size() && self->server_info.count( teams[t].servers[0] ) ) { for( int t=0; t<teams.size(); t++) {
auto& info = self->server_info[teams[t].servers[0]]; if( teams[t].servers.size() && self->server_info.count( teams[t].servers[0] ) ) {
auto& info = self->server_info[teams[t].servers[0]];
bool found = false; bool found = false;
for( int i = 0; i < info->teams.size(); i++ ) { for( int i = 0; i < info->teams.size(); i++ ) {
if( info->teams[i]->serverIDs == teams[t].servers ) { if( info->teams[i]->serverIDs == teams[t].servers ) {
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() ); maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
found = true; found = true;
break; break;
}
} }
}
TEST(!found); // A removed team is still associated with a shard in SABTF TEST(!found); // A removed team is still associated with a shard in SABTF
} else { } else {
TEST(teams[t].servers.size()); // A removed server is still associated with a team in SABTF TEST(teams[t].servers.size()); // A removed server is still associated with a team in SABTF
}
} }
} }
@ -1428,6 +1438,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
Void _ = wait( yield() ); Void _ = wait( yield() );
} }
} catch(Error& e) { } catch(Error& e) {
self->priority_teams[team->getPriority()]--;
if( team->isHealthy() ) { if( team->isHealthy() ) {
self->healthyTeamCount--; self->healthyTeamCount--;
ASSERT( self->healthyTeamCount >= 0 ); ASSERT( self->healthyTeamCount >= 0 );
@ -1994,8 +2005,14 @@ ACTOR Future<Void> dataDistributionTeamCollection(
} }
} }
when( Void _ = wait( loggingTrigger ) ) { when( Void _ = wait( loggingTrigger ) ) {
TraceEvent("TotalDataInFlight", masterId).detail("TotalBytes", self.getDebugTotalDataInFlight()).detail("UnhealthyServers", self.unhealthyServers).trackLatest( int highestPriority = 0;
(cx->dbName.toString() + "/TotalDataInFlight").c_str()); for(auto it : self.priority_teams) {
if(it.second > 0) {
highestPriority = std::max(highestPriority, it.first);
}
}
TraceEvent("TotalDataInFlight", masterId).detail("Primary", self.primary).detail("TotalBytes", self.getDebugTotalDataInFlight()).detail("UnhealthyServers", self.unhealthyServers)
.detail("HighestPriority", highestPriority).trackLatest( self.primary ? "TotalDataInFlight" : "TotalDataInFlightRemote" );
loggingTrigger = delay( SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL ); loggingTrigger = delay( SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL );
self.countHealthyTeams(); self.countHealthyTeams();
} }
@ -2181,8 +2198,8 @@ ACTOR Future<Void> dataDistribution(
.detail( "HighestPriority", 0 ) .detail( "HighestPriority", 0 )
.trackLatest( format("%s/MovingData", printable(cx->dbName).c_str() ).c_str() ); .trackLatest( format("%s/MovingData", printable(cx->dbName).c_str() ).c_str() );
TraceEvent("TotalDataInFlight", mi.id()).detail("TotalBytes", 0) TraceEvent("TotalDataInFlight", mi.id()).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight");
.trackLatest((cx->dbName.toString() + "/TotalDataInFlight").c_str()); TraceEvent("TotalDataInFlight", mi.id()).detail("Primary", false).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", configuration.usableRegions > 1 ? 0 : -1).trackLatest("TotalDataInFlightRemote");
Void _ = wait( waitForDataDistributionEnabled(cx) ); Void _ = wait( waitForDataDistributionEnabled(cx) );
TraceEvent("DataDistributionEnabled"); TraceEvent("DataDistributionEnabled");
@ -2192,6 +2209,7 @@ ACTOR Future<Void> dataDistribution(
ASSERT(configuration.storageTeamSize > 0); ASSERT(configuration.storageTeamSize > 0);
state PromiseStream<RelocateShard> output; state PromiseStream<RelocateShard> output;
state PromiseStream<RelocateShard> input;
state PromiseStream<Promise<int64_t>> getAverageShardBytes; state PromiseStream<Promise<int64_t>> getAverageShardBytes;
state PromiseStream<GetMetricsRequest> getShardMetrics; state PromiseStream<GetMetricsRequest> getShardMetrics;
state Reference<AsyncVar<bool>> processingUnhealthy( new AsyncVar<bool>(false) ); state Reference<AsyncVar<bool>> processingUnhealthy( new AsyncVar<bool>(false) );
@ -2217,6 +2235,7 @@ ACTOR Future<Void> dataDistribution(
} }
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure ); Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
actors.push_back(yieldPromiseStream(output.getFuture(), input));
for(int s=0; s<initData->shards.size() - 1; s++) { for(int s=0; s<initData->shards.size() - 1; s++) {
KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key); KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key);
@ -2235,8 +2254,8 @@ ACTOR Future<Void> dataDistribution(
} }
actors.push_back( pollMoveKeysLock(cx, lock) ); actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
if (configuration.usableRegions > 1) { if (configuration.usableRegions > 1) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) ); actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );

View File

@ -210,6 +210,7 @@ Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> const& initData, Reference<InitialDataDistribution> const& initData,
Database const& cx, Database const& cx,
PromiseStream<RelocateShard> const& output, PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> const& getShardMetrics, PromiseStream<GetMetricsRequest> const& getShardMetrics,
FutureStream<Promise<int64_t>> const& getAverageShardBytes, FutureStream<Promise<int64_t>> const& getAverageShardBytes,
Promise<Void> const& readyToStart, Promise<Void> const& readyToStart,
@ -218,7 +219,8 @@ Future<Void> dataDistributionTracker(
Future<Void> dataDistributionQueue( Future<Void> dataDistributionQueue(
Database const& cx, Database const& cx,
PromiseStream<RelocateShard> const& input, PromiseStream<RelocateShard> const& output,
FutureStream<RelocateShard> const& input,
PromiseStream<GetMetricsRequest> const& getShardMetrics, PromiseStream<GetMetricsRequest> const& getShardMetrics,
Reference<AsyncVar<bool>> const& processingUnhealthy, Reference<AsyncVar<bool>> const& processingUnhealthy,
vector<TeamCollectionInterface> const& teamCollection, vector<TeamCollectionInterface> const& teamCollection,
@ -245,4 +247,4 @@ struct ShardSizeBounds {
ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize); ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);
//Determines the maximum shard size based on the size of the database //Determines the maximum shard size based on the size of the database
int64_t getMaxShardSize( double dbSizeEstimate ); int64_t getMaxShardSize( double dbSizeEstimate );

View File

@ -363,7 +363,8 @@ struct DDQueueData {
PromiseStream<RelocateData> relocationComplete; PromiseStream<RelocateData> relocationComplete;
PromiseStream<RelocateData> fetchSourceServersComplete; PromiseStream<RelocateData> fetchSourceServersComplete;
PromiseStream<RelocateShard> input; PromiseStream<RelocateShard> output;
FutureStream<RelocateShard> input;
PromiseStream<GetMetricsRequest> getShardMetrics; PromiseStream<GetMetricsRequest> getShardMetrics;
double* lastLimited; double* lastLimited;
@ -394,10 +395,10 @@ struct DDQueueData {
DDQueueData( MasterInterface mi, MoveKeysLock lock, Database cx, std::vector<TeamCollectionInterface> teamCollections, DDQueueData( MasterInterface mi, MoveKeysLock lock, Database cx, std::vector<TeamCollectionInterface> teamCollections,
Reference<ShardsAffectedByTeamFailure> sABTF, PromiseStream<Promise<int64_t>> getAverageShardBytes, Reference<ShardsAffectedByTeamFailure> sABTF, PromiseStream<Promise<int64_t>> getAverageShardBytes,
int teamSize, PromiseStream<RelocateShard> input, PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited, Version recoveryVersion ) : int teamSize, PromiseStream<RelocateShard> output, FutureStream<RelocateShard> input, PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited, Version recoveryVersion ) :
activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ), activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ),
shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), mi( mi ), lock( lock ), shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), mi( mi ), lock( lock ),
cx( cx ), teamSize( teamSize ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), cx( cx ), teamSize( teamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited), recoveryVersion(recoveryVersion), finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited), recoveryVersion(recoveryVersion),
suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar<bool>(false) ) {} suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar<bool>(false) ) {}
@ -569,10 +570,6 @@ struct DDQueueData {
//This function cannot handle relocation requests which split a shard into three pieces //This function cannot handle relocation requests which split a shard into three pieces
void queueRelocation( RelocateData rd, std::set<UID> &serversToLaunchFrom ) { void queueRelocation( RelocateData rd, std::set<UID> &serversToLaunchFrom ) {
// Update sabtf for changes from DDTracker
if( rd.changesBoundaries() )
shardsAffectedByTeamFailure->defineShard( rd.keys );
//TraceEvent("QueueRelocationBegin").detail("Begin", printable(rd.keys.begin)).detail("End", printable(rd.keys.end)); //TraceEvent("QueueRelocationBegin").detail("Begin", printable(rd.keys.begin)).detail("End", printable(rd.keys.end));
// remove all items from both queues that are fully contained in the new relocation (i.e. will be overwritten) // remove all items from both queues that are fully contained in the new relocation (i.e. will be overwritten)
@ -1086,7 +1083,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
.detail("SourceTeam", sourceTeam->getDesc()) .detail("SourceTeam", sourceTeam->getDesc())
.detail("DestTeam", destTeam->getDesc()); .detail("DestTeam", destTeam->getDesc());
self->input.send( RelocateShard( moveShard, priority ) ); self->output.send( RelocateShard( moveShard, priority ) );
return true; return true;
} }
} }
@ -1166,7 +1163,8 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
ACTOR Future<Void> dataDistributionQueue( ACTOR Future<Void> dataDistributionQueue(
Database cx, Database cx,
PromiseStream<RelocateShard> input, PromiseStream<RelocateShard> output,
FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics, PromiseStream<GetMetricsRequest> getShardMetrics,
Reference<AsyncVar<bool>> processingUnhealthy, Reference<AsyncVar<bool>> processingUnhealthy,
std::vector<TeamCollectionInterface> teamCollections, std::vector<TeamCollectionInterface> teamCollections,
@ -1178,7 +1176,7 @@ ACTOR Future<Void> dataDistributionQueue(
double* lastLimited, double* lastLimited,
Version recoveryVersion) Version recoveryVersion)
{ {
state DDQueueData self( mi, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, input, getShardMetrics, lastLimited, recoveryVersion ); state DDQueueData self( mi, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, output, input, getShardMetrics, lastLimited, recoveryVersion );
state std::set<UID> serversToLaunchFrom; state std::set<UID> serversToLaunchFrom;
state KeyRange keysToLaunchFrom; state KeyRange keysToLaunchFrom;
state RelocateData launchData; state RelocateData launchData;
@ -1213,7 +1211,7 @@ ACTOR Future<Void> dataDistributionQueue(
ASSERT( launchData.startTime == -1 && keysToLaunchFrom.empty() ); ASSERT( launchData.startTime == -1 && keysToLaunchFrom.empty() );
choose { choose {
when ( RelocateShard rs = waitNext( self.input.getFuture() ) ) { when ( RelocateShard rs = waitNext( self.input ) ) {
bool wasEmpty = serversToLaunchFrom.empty(); bool wasEmpty = serversToLaunchFrom.empty();
self.queueRelocation( rs, serversToLaunchFrom ); self.queueRelocation( rs, serversToLaunchFrom );
if(wasEmpty && !serversToLaunchFrom.empty()) if(wasEmpty && !serversToLaunchFrom.empty())

View File

@ -74,14 +74,15 @@ struct DataDistributionTracker {
// CapacityTracker // CapacityTracker
PromiseStream<RelocateShard> output; PromiseStream<RelocateShard> output;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
Promise<Void> readyToStart; Promise<Void> readyToStart;
Reference<AsyncVar<bool>> anyZeroHealthyTeams; Reference<AsyncVar<bool>> anyZeroHealthyTeams;
DataDistributionTracker(Database cx, UID masterId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<AsyncVar<bool>> anyZeroHealthyTeams) DataDistributionTracker(Database cx, UID masterId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
: cx(cx), masterId( masterId ), dbSizeEstimate( new AsyncVar<int64_t>() ), : cx(cx), masterId( masterId ), dbSizeEstimate( new AsyncVar<int64_t>() ),
maxShardSize( new AsyncVar<Optional<int64_t>>() ), maxShardSize( new AsyncVar<Optional<int64_t>>() ),
sizeChanges(false), readyToStart(readyToStart), output( output ), anyZeroHealthyTeams(anyZeroHealthyTeams) {} sizeChanges(false), readyToStart(readyToStart), output( output ), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
~DataDistributionTracker() ~DataDistributionTracker()
{ {
@ -357,10 +358,16 @@ ACTOR Future<Void> shardSplitter(
for( int i = numShards-1; i > skipRange; i-- ) for( int i = numShards-1; i > skipRange; i-- )
restartShardTrackers( self, KeyRangeRef(splitKeys[i], splitKeys[i+1]) ); restartShardTrackers( self, KeyRangeRef(splitKeys[i], splitKeys[i+1]) );
for( int i = 0; i < skipRange; i++ ) for( int i = 0; i < skipRange; i++ ) {
self->output.send( RelocateShard( KeyRangeRef(splitKeys[i], splitKeys[i+1]), PRIORITY_SPLIT_SHARD) ); KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
for( int i = numShards-1; i > skipRange; i-- ) self->shardsAffectedByTeamFailure->defineShard( r );
self->output.send( RelocateShard( KeyRangeRef(splitKeys[i], splitKeys[i+1]), PRIORITY_SPLIT_SHARD) ); self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
}
for( int i = numShards-1; i > skipRange; i-- ) {
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
self->shardsAffectedByTeamFailure->defineShard( r );
self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
}
self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) ); self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) );
} else { } else {
@ -461,6 +468,7 @@ Future<Void> shardMerger(
.detail("TrackerID", trackerId); .detail("TrackerID", trackerId);
restartShardTrackers( self, mergeRange, endingStats ); restartShardTrackers( self, mergeRange, endingStats );
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
self->output.send( RelocateShard( mergeRange, PRIORITY_MERGE_SHARD ) ); self->output.send( RelocateShard( mergeRange, PRIORITY_MERGE_SHARD ) );
// We are about to be cancelled by the call to restartShardTrackers // We are about to be cancelled by the call to restartShardTrackers
@ -661,13 +669,14 @@ ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData, Reference<InitialDataDistribution> initData,
Database cx, Database cx,
PromiseStream<RelocateShard> output, PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics, PromiseStream<GetMetricsRequest> getShardMetrics,
FutureStream<Promise<int64_t>> getAverageShardBytes, FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Promise<Void> readyToStart,
Reference<AsyncVar<bool>> anyZeroHealthyTeams, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID masterId) UID masterId)
{ {
state DataDistributionTracker self(cx, masterId, readyToStart, output, anyZeroHealthyTeams); state DataDistributionTracker self(cx, masterId, readyToStart, output, shardsAffectedByTeamFailure, anyZeroHealthyTeams);
state Future<Void> loggingTrigger = Void(); state Future<Void> loggingTrigger = Void();
try { try {
Void _ = wait( trackInitialShards( &self, initData ) ); Void _ = wait( trackInitialShards( &self, initData ) );

View File

@ -63,6 +63,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2; init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000; init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
init( VERSIONS_PER_BATCH, VERSIONS_PER_SECOND/20 ); if( randomize && BUGGIFY ) VERSIONS_PER_BATCH = std::max<int64_t>(1,VERSIONS_PER_SECOND/1000); init( VERSIONS_PER_BATCH, VERSIONS_PER_SECOND/20 ); if( randomize && BUGGIFY ) VERSIONS_PER_BATCH = std::max<int64_t>(1,VERSIONS_PER_SECOND/1000);
init( CONCURRENT_LOG_ROUTER_READS, 1 );
// Data distribution queue // Data distribution queue
init( HEALTH_POLL_TIME, 1.0 ); init( HEALTH_POLL_TIME, 1.0 );
@ -211,6 +212,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
// Leader election // Leader election
bool longLeaderElection = randomize && BUGGIFY; bool longLeaderElection = randomize && BUGGIFY;
init( MAX_NOTIFICATIONS, 100000 ); init( MAX_NOTIFICATIONS, 100000 );
init( MIN_NOTIFICATIONS, 100 );
init( NOTIFICATION_FULL_CLEAR_TIME, 10000.0 );
init( CANDIDATE_MIN_DELAY, 0.05 ); init( CANDIDATE_MIN_DELAY, 0.05 );
init( CANDIDATE_MAX_DELAY, 1.0 ); init( CANDIDATE_MAX_DELAY, 1.0 );
init( CANDIDATE_GROWTH_RATE, 1.2 ); init( CANDIDATE_GROWTH_RATE, 1.2 );
@ -322,7 +325,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3; init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
bool smallTlogTarget = randomize && BUGGIFY; bool smallTlogTarget = randomize && BUGGIFY;
init( TARGET_BYTES_PER_TLOG, 2000e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3; init( TARGET_BYTES_PER_TLOG, 2400e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3; init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3;
init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0; init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0;
init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3; init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3;

View File

@ -67,6 +67,7 @@ public:
int PARALLEL_GET_MORE_REQUESTS; int PARALLEL_GET_MORE_REQUESTS;
int64_t MAX_QUEUE_COMMIT_BYTES; int64_t MAX_QUEUE_COMMIT_BYTES;
int64_t VERSIONS_PER_BATCH; int64_t VERSIONS_PER_BATCH;
int CONCURRENT_LOG_ROUTER_READS;
// Data distribution queue // Data distribution queue
double HEALTH_POLL_TIME; double HEALTH_POLL_TIME;
@ -157,6 +158,8 @@ public:
// Leader election // Leader election
int MAX_NOTIFICATIONS; int MAX_NOTIFICATIONS;
int MIN_NOTIFICATIONS;
double NOTIFICATION_FULL_CLEAR_TIME;
double CANDIDATE_MIN_DELAY; double CANDIDATE_MIN_DELAY;
double CANDIDATE_MAX_DELAY; double CANDIDATE_MAX_DELAY;
double CANDIDATE_GROWTH_RATE; double CANDIDATE_GROWTH_RATE;

View File

@ -69,7 +69,7 @@ ACTOR Future<int64_t> getDataInFlight( Database cx, WorkerInterface masterWorker
try { try {
TraceEvent("DataInFlight").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster"); TraceEvent("DataInFlight").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply( TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( cx->dbName.toString() + "/TotalDataInFlight" ) ) ), 1.0 ) ); EventLogRequest( LiteralStringRef("TotalDataInFlight") ) ), 1.0 ) );
int64_t dataInFlight; int64_t dataInFlight;
sscanf(md.getValue("TotalBytes").c_str(), "%lld", &dataInFlight); sscanf(md.getValue("TotalBytes").c_str(), "%lld", &dataInFlight);
return dataInFlight; return dataInFlight;

View File

@ -574,11 +574,13 @@ ACTOR static Future<StatusObject> processStatusFetcher(
} }
state std::vector<std::pair<StorageServerInterface, TraceEventFields>>::iterator ss; state std::vector<std::pair<StorageServerInterface, TraceEventFields>>::iterator ss;
state std::map<NetworkAddress, int64_t> ssLag; state std::map<NetworkAddress, double> ssLag;
for(ss = storageServers.begin(); ss != storageServers.end(); ++ss) { for(ss = storageServers.begin(); ss != storageServers.end(); ++ss) {
StatusObject const& roleStatus = roles.addRole( "storage", ss->first, ss->second, maxTLogVersion ); StatusObject const& roleStatus = roles.addRole( "storage", ss->first, ss->second, maxTLogVersion );
if(roleStatus.count("data_version_lag") > 0) { JSONDoc doc(roleStatus);
ssLag[ss->first.address()] = roleStatus.at("data_version_lag").get_int64(); double lagSeconds;
if(doc.tryGet("data_lag.seconds", lagSeconds)) {
ssLag[ss->first.address()] = lagSeconds;
} }
Void _ = wait(yield()); Void _ = wait(yield());
} }
@ -754,8 +756,8 @@ ACTOR static Future<StatusObject> processStatusFetcher(
messages.push_back(tracefileOpenErrorMap[strAddress]); messages.push_back(tracefileOpenErrorMap[strAddress]);
} }
if(ssLag[address] > 60 * SERVER_KNOBS->VERSIONS_PER_SECOND) { if(ssLag[address] >= 60) {
messages.push_back(makeMessage("storage_server_lagging", format("Storage server lagging by %ld seconds.", ssLag[address] / SERVER_KNOBS->VERSIONS_PER_SECOND).c_str())); messages.push_back(makeMessage("storage_server_lagging", format("Storage server lagging by %ld seconds.", (int64_t)ssLag[address]).c_str()));
} }
// Store the message array into the status object that represents the worker process // Store the message array into the status object that represents the worker process
@ -1068,7 +1070,6 @@ static StatusObject configurationFetcher(Optional<DatabaseConfiguration> conf, S
} }
ACTOR static Future<StatusObject> dataStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, std::string dbName, int *minReplicasRemaining) { ACTOR static Future<StatusObject> dataStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, std::string dbName, int *minReplicasRemaining) {
state StatusObject stateSectionObj;
state StatusObject statusObjData; state StatusObject statusObjData;
try { try {
@ -1077,96 +1078,135 @@ ACTOR static Future<StatusObject> dataStatusFetcher(std::pair<WorkerInterface, P
// TODO: Should this be serial? // TODO: Should this be serial?
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/DDTrackerStarting"))), 1.0)); futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/DDTrackerStarting"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/DDTrackerStats"))), 1.0)); futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/DDTrackerStats"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/MovingData"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlight"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlightRemote"))), 1.0));
std::vector<TraceEventFields> dataInfo = wait(getAll(futures)); std::vector<TraceEventFields> dataInfo = wait(getAll(futures));
TraceEventFields startingStats = dataInfo[0]; TraceEventFields startingStats = dataInfo[0];
state TraceEventFields dataStats = dataInfo[1]; TraceEventFields dataStats = dataInfo[1];
if (startingStats.size() && startingStats.getValue("State") != "Active") { if (startingStats.size() && startingStats.getValue("State") != "Active") {
StatusObject stateSectionObj;
stateSectionObj["name"] = "initializing"; stateSectionObj["name"] = "initializing";
stateSectionObj["description"] = "(Re)initializing automatic data distribution"; stateSectionObj["description"] = "(Re)initializing automatic data distribution";
statusObjData["state"] = stateSectionObj;
return statusObjData;
} }
else {
state TraceEventFields md = wait(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/MovingData"))), 1.0));
// If we have a MovingData message, parse it. TraceEventFields md = dataInfo[2];
if (md.size())
{
int64_t partitionsInQueue = parseInt64(md.getValue("InQueue"));
int64_t partitionsInFlight = parseInt64(md.getValue("InFlight"));
int64_t averagePartitionSize = parseInt64(md.getValue("AverageShardSize"));
int64_t totalBytesWritten = parseInt64(md.getValue("BytesWritten"));
int highestPriority = parseInt(md.getValue("HighestPriority"));
if( averagePartitionSize >= 0 ) { // If we have a MovingData message, parse it.
StatusObject moving_data; if (md.size())
moving_data["in_queue_bytes"] = partitionsInQueue * averagePartitionSize; {
moving_data["in_flight_bytes"] = partitionsInFlight * averagePartitionSize; int64_t partitionsInQueue = parseInt64(md.getValue("InQueue"));
moving_data["total_written_bytes"] = totalBytesWritten; int64_t partitionsInFlight = parseInt64(md.getValue("InFlight"));
int64_t averagePartitionSize = parseInt64(md.getValue("AverageShardSize"));
int64_t totalBytesWritten = parseInt64(md.getValue("BytesWritten"));
int highestPriority = parseInt(md.getValue("HighestPriority"));
// TODO: moving_data["rate_bytes"] = makeCounter(hz, c, r); if( averagePartitionSize >= 0 ) {
statusObjData["moving_data"] = moving_data; StatusObject moving_data;
moving_data["in_queue_bytes"] = partitionsInQueue * averagePartitionSize;
moving_data["in_flight_bytes"] = partitionsInFlight * averagePartitionSize;
moving_data["total_written_bytes"] = totalBytesWritten;
moving_data["highest_priority"] = highestPriority;
statusObjData["average_partition_size_bytes"] = averagePartitionSize; // TODO: moving_data["rate_bytes"] = makeCounter(hz, c, r);
} statusObjData["moving_data"] = moving_data;
if (highestPriority >= PRIORITY_TEAM_0_LEFT) { statusObjData["average_partition_size_bytes"] = averagePartitionSize;
stateSectionObj["healthy"] = false; }
stateSectionObj["name"] = "missing_data"; }
stateSectionObj["description"] = "No replicas remain of some data";
stateSectionObj["min_replicas_remaining"] = 0; if (dataStats.size())
{
int64_t totalDBBytes = parseInt64(dataStats.getValue("TotalSizeBytes"));
statusObjData["total_kv_size_bytes"] = totalDBBytes;
int shards = parseInt(dataStats.getValue("Shards"));
statusObjData["partitions_count"] = shards;
}
StatusArray teamTrackers;
for(int i = 0; i < 2; i++) {
TraceEventFields inFlight = dataInfo[3 + i];
if (!inFlight.size()) {
continue;
}
bool primary = parseInt(inFlight.getValue("Primary"));
int64_t totalDataInFlight = parseInt64(inFlight.getValue("TotalBytes"));
int unhealthyServers = parseInt(inFlight.getValue("UnhealthyServers"));
int highestPriority = parseInt(inFlight.getValue("HighestPriority"));
StatusObject team_tracker;
team_tracker["primary"] = primary;
team_tracker["in_flight_bytes"] = totalDataInFlight;
team_tracker["unhealthy_servers"] = unhealthyServers;
StatusObject stateSectionObj;
if (highestPriority >= PRIORITY_TEAM_0_LEFT) {
stateSectionObj["healthy"] = false;
stateSectionObj["name"] = "missing_data";
stateSectionObj["description"] = "No replicas remain of some data";
stateSectionObj["min_replicas_remaining"] = 0;
if(primary) {
*minReplicasRemaining = 0; *minReplicasRemaining = 0;
} }
else if (highestPriority >= PRIORITY_TEAM_1_LEFT) { }
stateSectionObj["healthy"] = false; else if (highestPriority >= PRIORITY_TEAM_1_LEFT) {
stateSectionObj["name"] = "healing"; stateSectionObj["healthy"] = false;
stateSectionObj["description"] = "Only one replica remains of some data"; stateSectionObj["name"] = "healing";
stateSectionObj["min_replicas_remaining"] = 1; stateSectionObj["description"] = "Only one replica remains of some data";
stateSectionObj["min_replicas_remaining"] = 1;
if(primary) {
*minReplicasRemaining = 1; *minReplicasRemaining = 1;
} }
else if (highestPriority >= PRIORITY_TEAM_2_LEFT) { }
stateSectionObj["healthy"] = false; else if (highestPriority >= PRIORITY_TEAM_2_LEFT) {
stateSectionObj["name"] = "healing"; stateSectionObj["healthy"] = false;
stateSectionObj["description"] = "Only two replicas remain of some data"; stateSectionObj["name"] = "healing";
stateSectionObj["min_replicas_remaining"] = 2; stateSectionObj["description"] = "Only two replicas remain of some data";
stateSectionObj["min_replicas_remaining"] = 2;
if(primary) {
*minReplicasRemaining = 2; *minReplicasRemaining = 2;
} }
else if (highestPriority >= PRIORITY_TEAM_UNHEALTHY) { }
stateSectionObj["healthy"] = false; else if (highestPriority >= PRIORITY_TEAM_UNHEALTHY) {
stateSectionObj["name"] = "healing"; stateSectionObj["healthy"] = false;
stateSectionObj["description"] = "Restoring replication factor"; stateSectionObj["name"] = "healing";
} stateSectionObj["description"] = "Restoring replication factor";
else if (highestPriority >= PRIORITY_MERGE_SHARD) { }
stateSectionObj["healthy"] = true; else if (highestPriority >= PRIORITY_MERGE_SHARD) {
stateSectionObj["name"] = "healthy_repartitioning"; stateSectionObj["healthy"] = true;
stateSectionObj["description"] = "Repartitioning."; stateSectionObj["name"] = "healthy_repartitioning";
} stateSectionObj["description"] = "Repartitioning.";
else if (highestPriority >= PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER) { }
stateSectionObj["healthy"] = true; else if (highestPriority >= PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER) {
stateSectionObj["name"] = "healthy_removing_server"; stateSectionObj["healthy"] = true;
stateSectionObj["description"] = "Removing storage server"; stateSectionObj["name"] = "healthy_removing_server";
} stateSectionObj["description"] = "Removing storage server";
else if (highestPriority >= PRIORITY_REBALANCE_SHARD) { }
stateSectionObj["healthy"] = true; else if (highestPriority >= PRIORITY_REBALANCE_SHARD) {
stateSectionObj["name"] = "healthy_rebalancing"; stateSectionObj["healthy"] = true;
stateSectionObj["description"] = "Rebalancing"; stateSectionObj["name"] = "healthy_rebalancing";
} stateSectionObj["description"] = "Rebalancing";
else if (highestPriority >= 0) { }
stateSectionObj["healthy"] = true; else if (highestPriority >= 0) {
stateSectionObj["name"] = "healthy"; stateSectionObj["healthy"] = true;
} stateSectionObj["name"] = "healthy";
} }
if (dataStats.size()) if(!stateSectionObj.empty()) {
{ team_tracker["state"] = stateSectionObj;
int64_t totalDBBytes = parseInt64(dataStats.getValue("TotalSizeBytes")); teamTrackers.push_back(team_tracker);
statusObjData["total_kv_size_bytes"] = totalDBBytes; if(primary) {
int shards = parseInt(dataStats.getValue("Shards")); statusObjData["state"] = stateSectionObj;
statusObjData["partitions_count"] = shards; }
} }
} }
statusObjData["team_trackers"] = teamTrackers;
} }
catch (Error &e) { catch (Error &e) {
if (e.code() == error_code_actor_cancelled) if (e.code() == error_code_actor_cancelled)
@ -1175,9 +1215,6 @@ ACTOR static Future<StatusObject> dataStatusFetcher(std::pair<WorkerInterface, P
// from the "cluster" perspective - from the client perspective it is not but that is indicated elsewhere. // from the "cluster" perspective - from the client perspective it is not but that is indicated elsewhere.
} }
if (!stateSectionObj.empty())
statusObjData["state"] = stateSectionObj;
return statusObjData; return statusObjData;
} }

View File

@ -72,7 +72,7 @@ struct StorageMetricSample {
} }
// If we didn't return above, we didn't find anything. // If we didn't return above, we didn't find anything.
TraceEvent(SevWarnAlways, "CannotSplitLastSampleKey").detail("Range", printable(range)).detail("Offset", offset); TraceEvent(SevWarn, "CannotSplitLastSampleKey").detail("Range", printable(range)).detail("Offset", offset);
return front ? range.end : range.begin; return front ? range.end : range.begin;
} }
}; };

View File

@ -269,14 +269,15 @@ struct TLogData : NonCopyable {
Future<Void> updatePersist; //SOMEDAY: integrate the recovery and update storage so that only one of them is committing to persistant data. Future<Void> updatePersist; //SOMEDAY: integrate the recovery and update storage so that only one of them is committing to persistant data.
PromiseStream<Future<Void>> sharedActors; PromiseStream<Future<Void>> sharedActors;
bool terminated; Promise<Void> terminated;
FlowLock concurrentLogRouterReads;
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo) TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
: dbgid(dbgid), instanceID(g_random->randomUniqueID().first()), : dbgid(dbgid), instanceID(g_random->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)), persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0), dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0),
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false), diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false),
bytesInput(0), bytesDurable(0), updatePersist(Void()), terminated(false) bytesInput(0), bytesDurable(0), updatePersist(Void()), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS)
{ {
} }
}; };
@ -287,19 +288,17 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool nothingPersistent; // true means tag is *known* to have no messages in persistentData. false means nothing. bool nothingPersistent; // true means tag is *known* to have no messages in persistentData. false means nothing.
bool poppedRecently; // `popped` has changed since last updatePersistentData bool poppedRecently; // `popped` has changed since last updatePersistentData
Version popped; // see popped version tracking contract below Version popped; // see popped version tracking contract below
bool updateVersionSizes;
bool unpoppedRecovered; bool unpoppedRecovered;
Tag tag; Tag tag;
TagData( Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), popped(popped), poppedRecently(poppedRecently), unpoppedRecovered(unpoppedRecovered), updateVersionSizes(tag != txsTag) {} TagData( Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), popped(popped), poppedRecently(poppedRecently), unpoppedRecovered(unpoppedRecovered) {}
TagData(TagData&& r) noexcept(true) : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), updateVersionSizes(r.updateVersionSizes), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {} TagData(TagData&& r) noexcept(true) : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
void operator= (TagData&& r) noexcept(true) { void operator= (TagData&& r) noexcept(true) {
versionMessages = std::move(r.versionMessages); versionMessages = std::move(r.versionMessages);
nothingPersistent = r.nothingPersistent; nothingPersistent = r.nothingPersistent;
poppedRecently = r.poppedRecently; poppedRecently = r.poppedRecently;
popped = r.popped; popped = r.popped;
updateVersionSizes = r.updateVersionSizes;
tag = r.tag; tag = r.tag;
unpoppedRecovered = r.unpoppedRecovered; unpoppedRecovered = r.unpoppedRecovered;
} }
@ -308,15 +307,17 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
ACTOR Future<Void> eraseMessagesBefore( TagData *self, Version before, int64_t* gBytesErased, Reference<LogData> tlogData, int taskID ) { ACTOR Future<Void> eraseMessagesBefore( TagData *self, Version before, int64_t* gBytesErased, Reference<LogData> tlogData, int taskID ) {
while(!self->versionMessages.empty() && self->versionMessages.front().first < before) { while(!self->versionMessages.empty() && self->versionMessages.front().first < before) {
Version version = self->versionMessages.front().first; Version version = self->versionMessages.front().first;
std::pair<int, int> &sizes = tlogData->version_sizes[version]; std::pair<int,int> &sizes = tlogData->version_sizes[version];
int64_t messagesErased = 0; int64_t messagesErased = 0;
while(!self->versionMessages.empty() && self->versionMessages.front().first == version) { while(!self->versionMessages.empty() && self->versionMessages.front().first == version) {
auto const& m = self->versionMessages.front(); auto const& m = self->versionMessages.front();
++messagesErased; ++messagesErased;
if(self->updateVersionSizes) { if(self->tag != txsTag) {
sizes.first -= m.second.expectedSize(); sizes.first -= m.second.expectedSize();
} else {
sizes.second -= m.second.expectedSize();
} }
self->versionMessages.pop_front(); self->versionMessages.pop_front();
@ -410,10 +411,11 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
int8_t locality; int8_t locality;
UID recruitmentID; UID recruitmentID;
std::set<Tag> allTags; std::set<Tag> allTags;
Future<Void> terminated;
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(1), logId(interf.id()), explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(1), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), recruitmentID(recruitmentID), cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), recruitmentID(recruitmentID),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
// These are initialized differently on init() or recovery // These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0), recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid) logRouterPopToVersion(0), locality(tagLocalityInvalid)
@ -439,13 +441,14 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
} }
~LogData() { ~LogData() {
tLogData->bytesDurable += bytesInput.getValue() - bytesDurable.getValue();
TraceEvent("TLogBytesWhenRemoved", logId).detail("SharedBytesInput", tLogData->bytesInput).detail("SharedBytesDurable", tLogData->bytesDurable).detail("LocalBytesInput", bytesInput.getValue()).detail("LocalBytesDurable", bytesDurable.getValue());
ASSERT_ABORT(tLogData->bytesDurable <= tLogData->bytesInput);
endRole(logId, "TLog", "Error", true); endRole(logId, "TLog", "Error", true);
if(!tLogData->terminated) { if(!terminated.isReady()) {
tLogData->bytesDurable += bytesInput.getValue() - bytesDurable.getValue();
TraceEvent("TLogBytesWhenRemoved", logId).detail("SharedBytesInput", tLogData->bytesInput).detail("SharedBytesDurable", tLogData->bytesDurable).detail("LocalBytesInput", bytesInput.getValue()).detail("LocalBytesDurable", bytesDurable.getValue());
ASSERT_ABORT(tLogData->bytesDurable <= tLogData->bytesInput);
Key logIdKey = BinaryWriter::toValue(logId,Unversioned()); Key logIdKey = BinaryWriter::toValue(logId,Unversioned());
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistCurrentVersionKeys.begin)) ); tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistCurrentVersionKeys.begin)) );
tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistKnownCommittedVersionKeys.begin)) ); tLogData->persistentData->clear( singleKeyRange(logIdKey.withPrefix(persistKnownCommittedVersionKeys.begin)) );
@ -647,8 +650,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
} }
state Reference<LogData> logData = self->id_data[self->queueOrder.front()]; state Reference<LogData> logData = self->id_data[self->queueOrder.front()];
state Version prevVersion = 0; state Version nextVersion = logData->version.get();
state Version nextVersion = 0;
state int totalSize = 0; state int totalSize = 0;
state int tagLocality = 0; state int tagLocality = 0;
@ -659,33 +661,13 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) { if (self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD) {
while(logData->persistentDataDurableVersion != logData->version.get()) { while(logData->persistentDataDurableVersion != logData->version.get()) {
totalSize = 0; totalSize = 0;
std::vector<std::pair<std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator, std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator>> iters; Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
nextVersion = logData->version.get();
for(tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) { while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end() )
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) { {
tagData = logData->tag_data[tagLocality][tagId]; totalSize += sizeItr->value.first + sizeItr->value.second;
if(tagData) { ++sizeItr;
iters.push_back(std::make_pair(tagData->versionMessages.begin(), tagData->versionMessages.end())); nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key;
}
}
}
nextVersion = 0;
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT || nextVersion <= logData->persistentDataVersion ) {
nextVersion = logData->version.get();
for( auto &it : iters )
if(it.first != it.second)
nextVersion = std::min( nextVersion, it.first->first + 1 );
if(nextVersion == logData->version.get())
break;
for( auto &it : iters ) {
while (it.first != it.second && it.first->first < nextVersion) {
totalSize += it.first->second.expectedSize();
++it.first;
}
}
} }
Void _ = wait( logData->queueCommittedVersion.whenAtLeast( nextVersion ) ); Void _ = wait( logData->queueCommittedVersion.whenAtLeast( nextVersion ) );
@ -714,34 +696,15 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
} }
else if(logData->initialized) { else if(logData->initialized) {
ASSERT(self->queueOrder.size() == 1); ASSERT(self->queueOrder.size() == 1);
state Map<Version, std::pair<int, int>>::iterator sizeItr = logData->version_sizes.begin(); Map<Version, std::pair<int,int>>::iterator sizeItr = logData->version_sizes.begin();
while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end() while( totalSize < SERVER_KNOBS->UPDATE_STORAGE_BYTE_LIMIT && sizeItr != logData->version_sizes.end()
&& (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) ) && (logData->bytesInput.getValue() - logData->bytesDurable.getValue() - totalSize >= SERVER_KNOBS->TLOG_SPILL_THRESHOLD || sizeItr->value.first == 0) )
{ {
Void _ = wait( yield(TaskUpdateStorage) ); totalSize += sizeItr->value.first + sizeItr->value.second;
++sizeItr; ++sizeItr;
nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key; nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key;
for(tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
tagData = logData->tag_data[tagLocality][tagId];
if(tagData) {
auto it = std::lower_bound(tagData->versionMessages.begin(), tagData->versionMessages.end(), std::make_pair(prevVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
for(; it != tagData->versionMessages.end() && it->first < nextVersion; ++it) {
totalSize += it->second.expectedSize();
}
Void _ = wait(yield(TaskUpdateStorage));
}
}
}
prevVersion = nextVersion;
} }
nextVersion = std::max<Version>(nextVersion, logData->persistentDataVersion);
//TraceEvent("UpdateStorageVer", logData->logId).detail("NextVersion", nextVersion).detail("PersistentDataVersion", logData->persistentDataVersion).detail("TotalSize", totalSize); //TraceEvent("UpdateStorageVer", logData->logId).detail("NextVersion", nextVersion).detail("PersistentDataVersion", logData->persistentDataVersion).detail("TotalSize", totalSize);
Void _ = wait( logData->queueCommittedVersion.whenAtLeast( nextVersion ) ); Void _ = wait( logData->queueCommittedVersion.whenAtLeast( nextVersion ) );
@ -779,7 +742,8 @@ void commitMessages( Reference<LogData> self, Version version, const std::vector
// way to do the memory allocation right as we receive the messages in the network layer. // way to do the memory allocation right as we receive the messages in the network layer.
int64_t addedBytes = 0; int64_t addedBytes = 0;
int64_t expectedBytes = 0; int expectedBytes = 0;
int txsBytes = 0;
if(!taggedMessages.size()) { if(!taggedMessages.size()) {
return; return;
@ -839,6 +803,8 @@ void commitMessages( Reference<LogData> self, Version version, const std::vector
} }
if (tag != txsTag) { if (tag != txsTag) {
expectedBytes += tagData->versionMessages.back().second.expectedSize(); expectedBytes += tagData->versionMessages.back().second.expectedSize();
} else {
txsBytes += tagData->versionMessages.back().second.expectedSize();
} }
// The factor of VERSION_MESSAGES_OVERHEAD is intended to be an overestimate of the actual memory used to store this data in a std::deque. // The factor of VERSION_MESSAGES_OVERHEAD is intended to be an overestimate of the actual memory used to store this data in a std::deque.
@ -854,7 +820,7 @@ void commitMessages( Reference<LogData> self, Version version, const std::vector
self->messageBlocks.push_back( std::make_pair(version, block) ); self->messageBlocks.push_back( std::make_pair(version, block) );
addedBytes += int64_t(block.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR; addedBytes += int64_t(block.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR;
self->version_sizes[version] = make_pair(expectedBytes, expectedBytes); self->version_sizes[version] = std::make_pair(expectedBytes, txsBytes);
self->bytesInput += addedBytes; self->bytesInput += addedBytes;
bytesInput += addedBytes; bytesInput += addedBytes;
@ -991,6 +957,12 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
Void _ = wait( delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()) ); Void _ = wait( delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()) );
} }
if( req.tag.locality == tagLocalityLogRouter ) {
Void _ = wait( self->concurrentLogRouterReads.take() );
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
Void _ = wait( delay(0.0, TaskLowPriority) );
}
Version poppedVer = poppedVersion(logData, req.tag); Version poppedVer = poppedVersion(logData, req.tag);
if(poppedVer > req.begin) { if(poppedVer > req.begin) {
TLogPeekReply rep; TLogPeekReply rep;
@ -1518,7 +1490,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
commitMessages(logData, ver, messages, self->bytesInput); commitMessages(logData, ver, messages, self->bytesInput);
if(self->terminated) { if(self->terminated.isSet()) {
return Void(); return Void();
} }
@ -1555,7 +1527,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped()); logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped());
} }
if(self->terminated) { if(self->terminated.isSet()) {
return Void(); return Void();
} }
@ -1775,7 +1747,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
for(auto &kv : data) { for(auto &kv : data) {
Tag tag = decodeTagPoppedKey(rawId, kv.key); Tag tag = decodeTagPoppedKey(rawId, kv.key);
Version popped = decodeTagPoppedValue(kv.value); Version popped = decodeTagPoppedValue(kv.value);
TraceEvent("TLogRestorePop", logData->logId).detail("Tag", tag.toString()).detail("To", popped); TraceEvent("TLogRestorePopped", logData->logId).detail("Tag", tag.toString()).detail("To", popped);
auto tagData = logData->getTagData(tag); auto tagData = logData->getTagData(tag);
ASSERT( !tagData ); ASSERT( !tagData );
logData->createTagData(tag, popped, false, false, false); logData->createTagData(tag, popped, false, false, false);
@ -2078,7 +2050,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
} }
} }
} catch (Error& e) { } catch (Error& e) {
self.terminated = true; self.terminated.send(Void());
TraceEvent("TLogError", tlogId).error(e, true); TraceEvent("TLogError", tlogId).error(e, true);
endRole(tlogId, "SharedTLog", "Error", true); endRole(tlogId, "SharedTLog", "Error", true);
if(recovered.canBeSet()) recovered.send(Void()); if(recovered.canBeSet()) recovered.send(Void());

View File

@ -1067,19 +1067,19 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
TraceEvent("MasterRecoveryState", self->dbgid) TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::fully_recovered) .detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered]) .detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str()); .trackLatest("MasterRecoveryState");
} else if( !newState.oldTLogData.size() && self->recoveryState < RecoveryState::STORAGE_RECOVERED ) { } else if( !newState.oldTLogData.size() && self->recoveryState < RecoveryState::STORAGE_RECOVERED ) {
self->recoveryState = RecoveryState::STORAGE_RECOVERED; self->recoveryState = RecoveryState::STORAGE_RECOVERED;
TraceEvent("MasterRecoveryState", self->dbgid) TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::storage_recovered) .detail("StatusCode", RecoveryStatus::storage_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::storage_recovered]) .detail("Status", RecoveryStatus::names[RecoveryStatus::storage_recovered])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str()); .trackLatest("MasterRecoveryState");
} else if( allLogs && self->recoveryState < RecoveryState::ALL_LOGS_RECRUITED ) { } else if( allLogs && self->recoveryState < RecoveryState::ALL_LOGS_RECRUITED ) {
self->recoveryState = RecoveryState::ALL_LOGS_RECRUITED; self->recoveryState = RecoveryState::ALL_LOGS_RECRUITED;
TraceEvent("MasterRecoveryState", self->dbgid) TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::all_logs_recruited) .detail("StatusCode", RecoveryStatus::all_logs_recruited)
.detail("Status", RecoveryStatus::names[RecoveryStatus::all_logs_recruited]) .detail("Status", RecoveryStatus::names[RecoveryStatus::all_logs_recruited])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str()); .trackLatest("MasterRecoveryState");
} }
if(newState.oldTLogData.size() && self->configuration.repopulateRegionAntiQuorum > 0 && self->logSystem->remoteStorageRecovered()) { if(newState.oldTLogData.size() && self->configuration.repopulateRegionAntiQuorum > 0 && self->logSystem->remoteStorageRecovered()) {

View File

@ -68,7 +68,6 @@ enum {
TaskDataDistribution = 3500, TaskDataDistribution = 3500,
TaskDiskWrite = 3010, TaskDiskWrite = 3010,
TaskUpdateStorage = 3000, TaskUpdateStorage = 3000,
TaskBatchCopy = 2900,
TaskLowPriority = 2000, TaskLowPriority = 2000,
TaskMinPriority = 1000 TaskMinPriority = 1000

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'> <Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)' <Product Name='$(var.Title)'
Id='{B6805A9A-CACA-4C08-9BE2-1EFAB91C9117}' Id='{28BA5AF3-B475-4C57-85AF-80A0C049EFE1}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}' UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)' Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)' Manufacturer='$(var.Manufacturer)'

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long