2017-05-26 04:48:44 +08:00
/*
* QuietDatabase . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# include "flow/ActorCollection.h"
# include "fdbrpc/simulator.h"
# include "flow/Trace.h"
# include "fdbclient/NativeAPI.h"
# include "fdbclient/DatabaseContext.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/TesterInterface.h"
# include "fdbserver/WorkerInterface.h"
# include "fdbserver/ServerDBInfo.h"
# include "fdbserver/Status.h"
2017-05-26 04:48:44 +08:00
# include "fdbclient/ManagementAPI.h"
2018-08-11 06:18:24 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
2017-10-25 03:58:54 +08:00
ACTOR Future < vector < std : : pair < WorkerInterface , ProcessClass > > > getWorkers ( Reference < AsyncVar < ServerDBInfo > > dbInfo , int flags = 0 ) {
2017-05-26 04:48:44 +08:00
loop {
choose {
2017-10-25 03:58:54 +08:00
when ( vector < std : : pair < WorkerInterface , ProcessClass > > w = wait ( brokenPromiseToNever ( dbInfo - > get ( ) . clusterInterface . getWorkers . getReply ( GetWorkersRequest ( flags ) ) ) ) ) {
2017-05-26 04:48:44 +08:00
return w ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( dbInfo - > onChange ( ) ) ) { }
2017-05-26 04:48:44 +08:00
}
}
}
//Gets the WorkerInterface representing the Master server.
ACTOR Future < WorkerInterface > getMasterWorker ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo ) {
2018-08-17 01:24:12 +08:00
TraceEvent ( " GetMasterWorker " ) . detail ( " Stage " , " GettingWorkers " ) ;
2017-05-26 04:48:44 +08:00
loop {
state vector < std : : pair < WorkerInterface , ProcessClass > > workers = wait ( getWorkers ( dbInfo ) ) ;
for ( int i = 0 ; i < workers . size ( ) ; i + + ) {
if ( workers [ i ] . first . address ( ) = = dbInfo - > get ( ) . master . address ( ) ) {
2018-08-17 01:24:12 +08:00
TraceEvent ( " GetMasterWorker " ) . detail ( " Stage " , " GotWorkers " ) . detail ( " MasterId " , dbInfo - > get ( ) . master . id ( ) ) . detail ( " WorkerId " , workers [ i ] . first . id ( ) ) ;
2017-05-26 04:48:44 +08:00
return workers [ i ] . first ;
}
}
TraceEvent ( SevWarn , " GetMasterWorkerError " )
2018-08-17 01:24:12 +08:00
. detail ( " Error " , " MasterWorkerNotFound " )
2017-05-26 04:48:44 +08:00
. detail ( " Master " , dbInfo - > get ( ) . master . id ( ) ) . detail ( " MasterAddress " , dbInfo - > get ( ) . master . address ( ) )
. detail ( " WorkerCount " , workers . size ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( delay ( 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
}
}
//Gets the number of bytes in flight from the master
ACTOR Future < int64_t > getDataInFlight ( Database cx , WorkerInterface masterWorker ) {
try {
2018-08-17 01:24:12 +08:00
TraceEvent ( " DataInFlight " ) . detail ( " Stage " , " ContactingMaster " ) ;
2018-05-03 01:44:38 +08:00
TraceEventFields md = wait ( timeoutError ( masterWorker . eventLogRequest . getReply (
2018-08-10 04:16:09 +08:00
EventLogRequest ( LiteralStringRef ( " TotalDataInFlight " ) ) ) , 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
int64_t dataInFlight ;
2018-05-03 01:44:38 +08:00
sscanf ( md . getValue ( " TotalBytes " ) . c_str ( ) , " %lld " , & dataInFlight ) ;
2017-05-26 04:48:44 +08:00
return dataInFlight ;
} catch ( Error & e ) {
2018-08-02 05:30:57 +08:00
TraceEvent ( " QuietDatabaseFailure " , masterWorker . id ( ) ) . error ( e ) . detail ( " Reason " , " Failed to extract DataInFlight " ) ;
2017-05-26 04:48:44 +08:00
throw ;
}
}
//Gets the number of bytes in flight from the master
//Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future < int64_t > getDataInFlight ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo ) {
WorkerInterface masterWorker = wait ( getMasterWorker ( cx , dbInfo ) ) ;
int64_t dataInFlight = wait ( getDataInFlight ( cx , masterWorker ) ) ;
return dataInFlight ;
}
2017-11-15 04:59:42 +08:00
//Computes the queue size for storage servers and tlogs using the bytesInput and bytesDurable attributes
2018-05-03 01:44:38 +08:00
int64_t getQueueSize ( TraceEventFields md ) {
2017-11-15 04:59:42 +08:00
double inputRate , durableRate ;
double inputRoughness , durableRoughness ;
int64_t inputBytes , durableBytes ;
2017-05-26 04:48:44 +08:00
2018-06-09 04:24:30 +08:00
sscanf ( md . getValue ( " BytesInput " ) . c_str ( ) , " %lf %lf %lld " , & inputRate , & inputRoughness , & inputBytes ) ;
sscanf ( md . getValue ( " BytesDurable " ) . c_str ( ) , " %lf %lf %lld " , & durableRate , & durableRoughness , & durableBytes ) ;
2017-05-26 04:48:44 +08:00
2017-11-15 04:59:42 +08:00
return inputBytes - durableBytes ;
2017-05-26 04:48:44 +08:00
}
// This is not robust in the face of a TLog failure
ACTOR Future < int64_t > getMaxTLogQueueSize ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo , WorkerInterface masterWorker ) {
2018-08-17 01:24:12 +08:00
TraceEvent ( " MaxTLogQueueSize " ) . detail ( " Stage " , " ContactingLogs " ) ;
2017-11-15 04:59:42 +08:00
state std : : vector < std : : pair < WorkerInterface , ProcessClass > > workers = wait ( getWorkers ( dbInfo ) ) ;
std : : map < NetworkAddress , WorkerInterface > workersMap ;
for ( auto worker : workers ) {
workersMap [ worker . first . address ( ) ] = worker . first ;
}
2017-05-26 04:48:44 +08:00
2018-05-03 01:44:38 +08:00
state std : : vector < Future < TraceEventFields > > messages ;
2017-05-26 04:48:44 +08:00
state std : : vector < TLogInterface > tlogs = dbInfo - > get ( ) . logSystemConfig . allPresentLogs ( ) ;
for ( int i = 0 ; i < tlogs . size ( ) ; i + + ) {
2017-11-15 04:59:42 +08:00
auto itr = workersMap . find ( tlogs [ i ] . address ( ) ) ;
if ( itr = = workersMap . end ( ) ) {
TraceEvent ( " QuietDatabaseFailure " ) . detail ( " Reason " , " Could not find worker for log server " ) . detail ( " Tlog " , tlogs [ i ] . id ( ) ) ;
throw attribute_not_found ( ) ;
}
messages . push_back ( timeoutError ( itr - > second . eventLogRequest . getReply (
EventLogRequest ( StringRef ( tlogs [ i ] . id ( ) . toString ( ) + " /TLogMetrics " ) ) ) , 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
}
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( messages ) ) ;
2017-05-26 04:48:44 +08:00
2018-08-17 01:24:12 +08:00
TraceEvent ( " MaxTLogQueueSize " ) . detail ( " Stage " , " ComputingMax " ) . detail ( " MessageCount " , messages . size ( ) ) ;
2017-05-26 04:48:44 +08:00
state int64_t maxQueueSize = 0 ;
state int i = 0 ;
for ( ; i < messages . size ( ) ; i + + ) {
try {
maxQueueSize = std : : max ( maxQueueSize , getQueueSize ( messages [ i ] . get ( ) ) ) ;
} catch ( Error & e ) {
2017-11-15 04:59:42 +08:00
TraceEvent ( " QuietDatabaseFailure " ) . detail ( " Reason " , " Failed to extract MaxTLogQueue " ) . detail ( " Tlog " , tlogs [ i ] . id ( ) ) ;
2017-05-26 04:48:44 +08:00
throw ;
}
}
return maxQueueSize ;
}
ACTOR Future < int64_t > getMaxTLogQueueSize ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo ) {
WorkerInterface masterWorker = wait ( getMasterWorker ( cx , dbInfo ) ) ;
int64_t maxQueueSize = wait ( getMaxTLogQueueSize ( cx , dbInfo , masterWorker ) ) ;
return maxQueueSize ;
}
ACTOR Future < vector < StorageServerInterface > > getStorageServers ( Database cx , bool use_system_priority = false ) {
state Transaction tr ( cx ) ;
if ( use_system_priority )
tr . setOption ( FDBTransactionOptions : : PRIORITY_SYSTEM_IMMEDIATE ) ;
2017-07-29 09:50:32 +08:00
tr . setOption ( FDBTransactionOptions : : LOCK_AWARE ) ;
2017-05-26 04:48:44 +08:00
loop {
try {
Standalone < RangeResultRef > serverList = wait ( tr . getRange ( serverListKeys , CLIENT_KNOBS - > TOO_MANY ) ) ;
ASSERT ( ! serverList . more & & serverList . size ( ) < CLIENT_KNOBS - > TOO_MANY ) ;
vector < StorageServerInterface > servers ;
for ( int i = 0 ; i < serverList . size ( ) ; i + + )
servers . push_back ( decodeServerListValue ( serverList [ i ] . value ) ) ;
return servers ;
}
catch ( Error & e ) {
2018-08-11 04:57:10 +08:00
wait ( tr . onError ( e ) ) ;
2017-05-26 04:48:44 +08:00
}
}
}
//Gets the maximum size of all the storage server queues
ACTOR Future < int64_t > getMaxStorageServerQueueSize ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo , WorkerInterface masterWorker ) {
2018-08-17 01:24:12 +08:00
TraceEvent ( " MaxStorageServerQueueSize " ) . detail ( " Stage " , " ContactingStorageServers " ) ;
2017-11-15 04:59:42 +08:00
Future < std : : vector < StorageServerInterface > > serversFuture = getStorageServers ( cx ) ;
state Future < std : : vector < std : : pair < WorkerInterface , ProcessClass > > > workersFuture = getWorkers ( dbInfo ) ;
state std : : vector < StorageServerInterface > servers = wait ( serversFuture ) ;
state std : : vector < std : : pair < WorkerInterface , ProcessClass > > workers = wait ( workersFuture ) ;
std : : map < NetworkAddress , WorkerInterface > workersMap ;
for ( auto worker : workers ) {
workersMap [ worker . first . address ( ) ] = worker . first ;
}
2017-05-26 04:48:44 +08:00
2018-05-03 01:44:38 +08:00
state std : : vector < Future < TraceEventFields > > messages ;
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < servers . size ( ) ; i + + ) {
2017-11-15 04:59:42 +08:00
auto itr = workersMap . find ( servers [ i ] . address ( ) ) ;
if ( itr = = workersMap . end ( ) ) {
TraceEvent ( " QuietDatabaseFailure " ) . detail ( " Reason " , " Could not find worker for storage server " ) . detail ( " SS " , servers [ i ] . id ( ) ) ;
throw attribute_not_found ( ) ;
}
messages . push_back ( timeoutError ( itr - > second . eventLogRequest . getReply (
EventLogRequest ( StringRef ( servers [ i ] . id ( ) . toString ( ) + " /StorageMetrics " ) ) ) , 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
}
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( messages ) ) ;
2017-05-26 04:48:44 +08:00
2018-08-17 01:24:12 +08:00
TraceEvent ( " MaxStorageServerQueueSize " ) . detail ( " Stage " , " ComputingMax " ) . detail ( " MessageCount " , messages . size ( ) ) ;
2017-05-26 04:48:44 +08:00
state int64_t maxQueueSize = 0 ;
state int i = 0 ;
for ( ; i < messages . size ( ) ; i + + ) {
try {
maxQueueSize = std : : max ( maxQueueSize , getQueueSize ( messages [ i ] . get ( ) ) ) ;
} catch ( Error & e ) {
TraceEvent ( " QuietDatabaseFailure " , masterWorker . id ( ) ) . detail ( " Reason " , " Failed to extract MaxStorageServerQueue " ) . detail ( " SS " , servers [ i ] . id ( ) ) ;
throw ;
}
}
return maxQueueSize ;
}
//Gets the maximum size of all the storage server queues
//Convenience method that first gets the master worker and system map from a zookeeper interface
ACTOR Future < int64_t > getMaxStorageServerQueueSize ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo ) {
WorkerInterface masterWorker = wait ( getMasterWorker ( cx , dbInfo ) ) ;
int64_t maxQueueSize = wait ( getMaxStorageServerQueueSize ( cx , dbInfo , masterWorker ) ) ;
return maxQueueSize ;
}
//Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of the queue
ACTOR Future < int64_t > getDataDistributionQueueSize ( Database cx , WorkerInterface masterWorker , bool reportInFlight ) {
try {
2018-08-17 01:24:12 +08:00
TraceEvent ( " DataDistributionQueueSize " ) . detail ( " Stage " , " ContactingMaster " ) ;
2017-05-26 04:48:44 +08:00
2018-05-03 01:44:38 +08:00
TraceEventFields movingDataMessage = wait ( timeoutError ( masterWorker . eventLogRequest . getReply (
2018-08-17 01:24:12 +08:00
EventLogRequest ( LiteralStringRef ( " MovingData " ) ) ) , 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
2018-08-17 01:24:12 +08:00
TraceEvent ( " DataDistributionQueueSize " ) . detail ( " Stage " , " GotString " ) ;
2017-05-26 04:48:44 +08:00
int64_t inQueue ;
2018-05-03 01:44:38 +08:00
sscanf ( movingDataMessage . getValue ( " InQueue " ) . c_str ( ) , " %lld " , & inQueue ) ;
2017-05-26 04:48:44 +08:00
if ( reportInFlight ) {
int64_t inFlight ;
2018-05-03 01:44:38 +08:00
sscanf ( movingDataMessage . getValue ( " InFlight " ) . c_str ( ) , " %lld " , & inFlight ) ;
2017-05-26 04:48:44 +08:00
inQueue + = inFlight ;
}
return inQueue ;
} catch ( Error & e ) {
TraceEvent ( " QuietDatabaseFailure " , masterWorker . id ( ) ) . detail ( " Reason " , " Failed to extract DataDistributionQueueSize " ) ;
throw ;
}
}
//Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of the queue
//Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future < int64_t > getDataDistributionQueueSize ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo , bool reportInFlight ) {
WorkerInterface masterWorker = wait ( getMasterWorker ( cx , dbInfo ) ) ;
int64_t inQueue = wait ( getDataDistributionQueueSize ( cx , masterWorker , reportInFlight ) ) ;
return inQueue ;
}
//Checks that data distribution is active
ACTOR Future < bool > getDataDistributionActive ( Database cx , WorkerInterface masterWorker ) {
try {
2018-08-17 01:24:12 +08:00
TraceEvent ( " DataDistributionActive " ) . detail ( " Stage " , " ContactingMaster " ) ;
2017-05-26 04:48:44 +08:00
2018-05-03 01:44:38 +08:00
TraceEventFields activeMessage = wait ( timeoutError ( masterWorker . eventLogRequest . getReply (
2018-08-17 01:24:12 +08:00
EventLogRequest ( LiteralStringRef ( " DDTrackerStarting " ) ) ) , 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
2018-05-03 01:44:38 +08:00
return activeMessage . getValue ( " State " ) = = " Active " ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
TraceEvent ( " QuietDatabaseFailure " , masterWorker . id ( ) ) . detail ( " Reason " , " Failed to extract DataDistributionActive " ) ;
throw ;
}
}
//Checks to see if any storage servers are being recruited
ACTOR Future < bool > getStorageServersRecruiting ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo , WorkerInterface masterWorker ) {
try {
2018-08-17 01:24:12 +08:00
TraceEvent ( " StorageServersRecruiting " ) . detail ( " Stage " , " ContactingMaster " ) ;
2017-05-26 04:48:44 +08:00
2018-05-03 01:44:38 +08:00
TraceEventFields recruitingMessage = wait ( timeoutError ( masterWorker . eventLogRequest . getReply (
2018-08-17 01:24:12 +08:00
EventLogRequest ( StringRef ( " StorageServerRecruitment_ " + dbInfo - > get ( ) . master . id ( ) . toString ( ) ) ) ) , 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
2018-05-03 01:44:38 +08:00
return recruitingMessage . getValue ( " State " ) = = " Recruiting " ;
2017-05-26 04:48:44 +08:00
} catch ( Error & e ) {
TraceEvent ( " QuietDatabaseFailure " , masterWorker . id ( ) ) . detail ( " Reason " , " Failed to extract StorageServersRecruiting " ) . detail ( " MasterID " , dbInfo - > get ( ) . master . id ( ) ) ;
throw ;
}
}
2018-07-17 01:06:57 +08:00
ACTOR Future < Void > repairDeadDatacenter ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo , std : : string context ) {
2018-10-06 03:35:37 +08:00
if ( g_network - > isSimulated ( ) & & g_simulator . usableRegions > 1 ) {
2018-07-17 01:06:57 +08:00
bool primaryDead = g_simulator . datacenterDead ( g_simulator . primaryDcId ) ;
bool remoteDead = g_simulator . datacenterDead ( g_simulator . remoteDcId ) ;
ASSERT ( ! primaryDead | | ! remoteDead ) ;
if ( primaryDead | | remoteDead ) {
2018-11-10 02:07:55 +08:00
TraceEvent ( SevWarnAlways , " DisablingFearlessConfiguration " ) . detail ( " Location " , context ) . detail ( " Stage " , " Repopulate " ) . detail ( " RemoteDead " , remoteDead ) . detail ( " PrimaryDead " , primaryDead ) ;
2018-07-17 01:06:57 +08:00
g_simulator . usableRegions = 1 ;
2018-11-05 11:53:55 +08:00
ConfigurationResult : : Type _ = wait ( changeConfig ( cx , ( primaryDead ? g_simulator . disablePrimary : g_simulator . disableRemote ) + " repopulate_anti_quorum=1 " , true ) ) ;
2018-07-17 01:06:57 +08:00
while ( dbInfo - > get ( ) . recoveryState < RecoveryState : : STORAGE_RECOVERED ) {
2018-08-11 04:57:10 +08:00
wait ( dbInfo - > onChange ( ) ) ;
2018-07-17 01:06:57 +08:00
}
TraceEvent ( SevWarnAlways , " DisablingFearlessConfiguration " ) . detail ( " Location " , context ) . detail ( " Stage " , " Usable_Regions " ) ;
2018-11-05 11:53:55 +08:00
ConfigurationResult : : Type _ = wait ( changeConfig ( cx , " usable_regions=1 " , true ) ) ;
2018-05-02 06:31:49 +08:00
}
}
2018-07-17 01:06:57 +08:00
return Void ( ) ;
}
2018-05-02 06:31:49 +08:00
2018-07-17 01:06:57 +08:00
ACTOR Future < Void > reconfigureAfter ( Database cx , double time , Reference < AsyncVar < ServerDBInfo > > dbInfo , std : : string context ) {
2018-08-11 04:57:10 +08:00
wait ( delay ( time ) ) ;
wait ( repairDeadDatacenter ( cx , dbInfo , context ) ) ;
2018-05-02 06:31:49 +08:00
return Void ( ) ;
}
2017-05-26 04:48:44 +08:00
ACTOR Future < Void > waitForQuietDatabase ( Database cx , Reference < AsyncVar < ServerDBInfo > > dbInfo , std : : string phase , int64_t dataInFlightGate = 2e6 ,
int64_t maxTLogQueueGate = 5e6 , int64_t maxStorageServerQueueGate = 5e6 , int64_t maxDataDistributionQueueSize = 0 ) {
2018-07-17 01:06:57 +08:00
state Future < Void > reconfig = reconfigureAfter ( cx , 100 + ( g_random - > random01 ( ) * 100 ) , dbInfo , " QuietDatabase " ) ;
2017-05-26 04:48:44 +08:00
TraceEvent ( ( " QuietDatabase " + phase + " Begin " ) . c_str ( ) ) ;
//In a simulated environment, wait 5 seconds so that workers can move to their optimal locations
if ( g_network - > isSimulated ( ) )
2018-08-11 04:57:10 +08:00
wait ( delay ( 5.0 ) ) ;
2017-05-26 04:48:44 +08:00
2018-01-15 04:50:52 +08:00
//Require 3 consecutive successful quiet database checks spaced 2 second apart
2017-05-26 04:48:44 +08:00
state int numSuccesses = 0 ;
loop {
try {
TraceEvent ( " QuietDatabaseWaitingOnMaster " ) ;
WorkerInterface masterWorker = wait ( getMasterWorker ( cx , dbInfo ) ) ;
TraceEvent ( " QuietDatabaseGotMaster " ) ;
state Future < int64_t > dataInFlight = getDataInFlight ( cx , masterWorker ) ;
state Future < int64_t > tLogQueueSize = getMaxTLogQueueSize ( cx , dbInfo , masterWorker ) ;
state Future < int64_t > dataDistributionQueueSize = getDataDistributionQueueSize ( cx , masterWorker , dataInFlightGate = = 0 ) ;
state Future < int64_t > storageQueueSize = getMaxStorageServerQueueSize ( cx , dbInfo , masterWorker ) ;
state Future < bool > dataDistributionActive = getDataDistributionActive ( cx , masterWorker ) ;
state Future < bool > storageServersRecruiting = getStorageServersRecruiting ( cx , dbInfo , masterWorker ) ;
2018-08-11 04:57:10 +08:00
wait ( success ( dataInFlight ) & & success ( tLogQueueSize ) & & success ( dataDistributionQueueSize )
2017-05-26 04:48:44 +08:00
& & success ( storageQueueSize ) & & success ( dataDistributionActive ) & & success ( storageServersRecruiting ) ) ;
TraceEvent ( ( " QuietDatabase " + phase ) . c_str ( ) )
2018-06-09 02:11:08 +08:00
. detail ( " DataInFlight " , dataInFlight . get ( ) ) . detail ( " MaxTLogQueueSize " , tLogQueueSize . get ( ) ) . detail ( " DataDistributionQueueSize " , dataDistributionQueueSize . get ( ) )
. detail ( " MaxStorageQueueSize " , storageQueueSize . get ( ) ) . detail ( " DataDistributionActive " , dataDistributionActive . get ( ) )
. detail ( " StorageServersRecruiting " , storageServersRecruiting . get ( ) ) ;
2017-05-26 04:48:44 +08:00
if ( dataInFlight . get ( ) > dataInFlightGate | | tLogQueueSize . get ( ) > maxTLogQueueGate
| | dataDistributionQueueSize . get ( ) > maxDataDistributionQueueSize | | storageQueueSize . get ( ) > maxStorageServerQueueGate
| | dataDistributionActive . get ( ) = = false | | storageServersRecruiting . get ( ) = = true ) {
2018-08-11 04:57:10 +08:00
wait ( delay ( 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
numSuccesses = 0 ;
} else {
2018-01-15 04:50:52 +08:00
if ( + + numSuccesses = = 3 ) {
2017-05-26 04:48:44 +08:00
TraceEvent ( ( " QuietDatabase " + phase + " Done " ) . c_str ( ) ) ;
break ;
}
else
2018-08-11 04:57:10 +08:00
wait ( delay ( 2.0 ) ) ;
2017-05-26 04:48:44 +08:00
}
} catch ( Error & e ) {
if ( e . code ( ) ! = error_code_actor_cancelled & & e . code ( ) ! = error_code_attribute_not_found & & e . code ( ) ! = error_code_timed_out )
TraceEvent ( ( " QuietDatabase " + phase + " Error " ) . c_str ( ) ) . error ( e ) ;
//Client invalid operation occurs if we don't get back a message from one of the servers, often corrected by retrying
if ( e . code ( ) ! = error_code_attribute_not_found & & e . code ( ) ! = error_code_timed_out )
throw ;
TraceEvent ( ( " QuietDatabase " + phase + " Retry " ) . c_str ( ) ) . error ( e ) ;
2018-08-11 04:57:10 +08:00
wait ( delay ( 1.0 ) ) ;
2017-05-26 04:48:44 +08:00
numSuccesses = 0 ;
}
}
return Void ( ) ;
}
Future < Void > quietDatabase ( Database const & cx , Reference < AsyncVar < ServerDBInfo > > const & dbInfo , std : : string phase , int64_t dataInFlightGate ,
int64_t maxTLogQueueGate , int64_t maxStorageServerQueueGate , int64_t maxDataDistributionQueueSize ) {
2018-02-19 04:59:43 +08:00
return waitForQuietDatabase ( cx , dbInfo , phase , dataInFlightGate , maxTLogQueueGate , maxStorageServerQueueGate , maxDataDistributionQueueSize ) ;
2017-05-26 04:48:44 +08:00
}