2017-05-26 04:48:44 +08:00
/*
* Ratekeeper . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# include "flow/IndexedSet.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/Ratekeeper.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/FailureMonitor.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/Knobs.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/Smoother.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/ServerDBInfo.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/simulator.h"
2019-02-20 08:04:52 +08:00
# include "fdbclient/ReadYourWrites.h"
2018-08-11 06:18:24 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
enum limitReason_t {
unlimited , // TODO: rename to workload?
storage_server_write_queue_size ,
storage_server_write_bandwidth_mvcc ,
storage_server_readable_behind ,
log_server_mvcc_write_bandwidth ,
log_server_write_queue ,
storage_server_min_free_space , // a storage server's normal limits are being reduced by low free space
storage_server_min_free_space_ratio , // a storage server's normal limits are being reduced by a low free space ratio
log_server_min_free_space ,
log_server_min_free_space_ratio ,
limitReason_t_end
} ;
int limitReasonEnd = limitReason_t_end ;
const char * limitReasonName [ ] = {
" workload " ,
" storage_server_write_queue_size " ,
" storage_server_write_bandwidth_mvcc " ,
" storage_server_readable_behind " ,
" log_server_mvcc_write_bandwidth " ,
" log_server_write_queue " ,
" storage_server_min_free_space " ,
" storage_server_min_free_space_ratio " ,
" log_server_min_free_space " ,
" log_server_min_free_space_ratio "
} ;
static_assert ( sizeof ( limitReasonName ) / sizeof ( limitReasonName [ 0 ] ) = = limitReason_t_end , " limitReasonDesc table size " ) ;
// NOTE: This has a corresponding table in Script.cs (see RatekeeperReason graph)
// IF UPDATING THIS ARRAY, UPDATE SCRIPT.CS!
const char * limitReasonDesc [ ] = {
" Workload or read performance. " ,
" Storage server performance (storage queue). " ,
" Storage server MVCC memory. " ,
" Storage server version falling behind. " ,
" Log server MVCC memory. " ,
" Storage server performance (log queue). " ,
" Storage server running out of space (approaching 100MB limit). " ,
" Storage server running out of space (approaching 5% limit). " ,
" Log server running out of space (approaching 100MB limit). " ,
" Log server running out of space (approaching 5% limit). " ,
} ;
static_assert ( sizeof ( limitReasonDesc ) / sizeof ( limitReasonDesc [ 0 ] ) = = limitReason_t_end , " limitReasonDesc table size " ) ;
struct StorageQueueInfo {
bool valid ;
UID id ;
LocalityData locality ;
StorageQueuingMetricsReply lastReply ;
StorageQueuingMetricsReply prevReply ;
Smoother smoothDurableBytes , smoothInputBytes , verySmoothDurableBytes ;
Smoother smoothDurableVersion , smoothLatestVersion ;
Smoother smoothFreeSpace ;
Smoother smoothTotalSpace ;
StorageQueueInfo ( UID id , LocalityData locality ) : valid ( false ) , id ( id ) , locality ( locality ) , smoothDurableBytes ( SERVER_KNOBS - > SMOOTHING_AMOUNT ) ,
smoothInputBytes ( SERVER_KNOBS - > SMOOTHING_AMOUNT ) , verySmoothDurableBytes ( SERVER_KNOBS - > SLOW_SMOOTHING_AMOUNT ) ,
smoothDurableVersion ( 1. ) , smoothLatestVersion ( 1. ) , smoothFreeSpace ( SERVER_KNOBS - > SMOOTHING_AMOUNT ) ,
2019-02-28 02:31:56 +08:00
smoothTotalSpace ( SERVER_KNOBS - > SMOOTHING_AMOUNT )
2017-05-26 04:48:44 +08:00
{
2018-05-08 07:19:50 +08:00
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
2017-05-26 04:48:44 +08:00
lastReply . instanceID = - 1 ;
}
} ;
struct TLogQueueInfo {
bool valid ;
UID id ;
TLogQueuingMetricsReply lastReply ;
TLogQueuingMetricsReply prevReply ;
Smoother smoothDurableBytes , smoothInputBytes , verySmoothDurableBytes ;
Smoother smoothFreeSpace ;
Smoother smoothTotalSpace ;
TLogQueueInfo ( UID id ) : valid ( false ) , id ( id ) , smoothDurableBytes ( SERVER_KNOBS - > SMOOTHING_AMOUNT ) , smoothInputBytes ( SERVER_KNOBS - > SMOOTHING_AMOUNT ) ,
verySmoothDurableBytes ( SERVER_KNOBS - > SLOW_SMOOTHING_AMOUNT ) , smoothFreeSpace ( SERVER_KNOBS - > SMOOTHING_AMOUNT ) ,
smoothTotalSpace ( SERVER_KNOBS - > SMOOTHING_AMOUNT ) {
2018-05-08 07:19:50 +08:00
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from storageQueueInfO)
2017-05-26 04:48:44 +08:00
lastReply . instanceID = - 1 ;
}
} ;
2019-02-28 02:31:56 +08:00
struct RatekeeperLimits {
double tpsLimit ;
Int64MetricHandle tpsLimitMetric ;
Int64MetricHandle reasonMetric ;
int64_t storageTargetBytes ;
int64_t storageSpringBytes ;
int64_t logTargetBytes ;
int64_t logSpringBytes ;
int64_t maxVersionDifference ;
std : : string context ;
RatekeeperLimits ( std : : string context , int64_t storageTargetBytes , int64_t storageSpringBytes , int64_t logTargetBytes , int64_t logSpringBytes , int64_t maxVersionDifference ) :
tpsLimit ( std : : numeric_limits < double > : : infinity ( ) ) ,
tpsLimitMetric ( StringRef ( " Ratekeeper.TPSLimit " + context ) ) ,
reasonMetric ( StringRef ( " Ratekeeper.Reason " + context ) ) ,
storageTargetBytes ( storageTargetBytes ) ,
storageSpringBytes ( storageSpringBytes ) ,
logTargetBytes ( logTargetBytes ) ,
logSpringBytes ( logSpringBytes ) ,
maxVersionDifference ( maxVersionDifference ) ,
context ( context )
{ }
} ;
2017-05-26 04:48:44 +08:00
struct Ratekeeper {
Map < UID , StorageQueueInfo > storageQueueInfo ;
Map < UID , TLogQueueInfo > tlogQueueInfo ;
std : : map < UID , std : : pair < int64_t , double > > proxy_transactionCountAndTime ;
Smoother smoothReleasedTransactions , smoothTotalDurableBytes ;
DatabaseConfiguration configuration ;
Int64MetricHandle actualTpsMetric ;
2019-02-28 02:31:56 +08:00
2017-05-26 04:48:44 +08:00
double lastWarning ;
double * lastLimited ;
2019-02-28 02:31:56 +08:00
RatekeeperLimits normalLimits ;
RatekeeperLimits batchLimits ;
Ratekeeper ( ) : smoothReleasedTransactions ( SERVER_KNOBS - > SMOOTHING_AMOUNT ) , smoothTotalDurableBytes ( SERVER_KNOBS - > SLOW_SMOOTHING_AMOUNT ) ,
2017-05-26 04:48:44 +08:00
actualTpsMetric ( LiteralStringRef ( " Ratekeeper.ActualTPS " ) ) ,
2019-02-28 02:31:56 +08:00
lastWarning ( 0 ) ,
normalLimits ( " " , SERVER_KNOBS - > TARGET_BYTES_PER_STORAGE_SERVER , SERVER_KNOBS - > SPRING_BYTES_STORAGE_SERVER , SERVER_KNOBS - > TARGET_BYTES_PER_TLOG , SERVER_KNOBS - > SPRING_BYTES_TLOG , SERVER_KNOBS - > MAX_TL_SS_VERSION_DIFFERENCE ) ,
batchLimits ( " Batch " , SERVER_KNOBS - > TARGET_BYTES_PER_STORAGE_SERVER_BATCH , SERVER_KNOBS - > SPRING_BYTES_STORAGE_SERVER_BATCH , SERVER_KNOBS - > TARGET_BYTES_PER_TLOG_BATCH , SERVER_KNOBS - > SPRING_BYTES_TLOG_BATCH , SERVER_KNOBS - > MAX_TL_SS_VERSION_DIFFERENCE_BATCH )
2017-05-26 04:48:44 +08:00
{ }
} ;
//SOMEDAY: template trackStorageServerQueueInfo and trackTLogQueueInfo into one function
ACTOR Future < Void > trackStorageServerQueueInfo ( Ratekeeper * self , StorageServerInterface ssi ) {
self - > storageQueueInfo . insert ( mapPair ( ssi . id ( ) , StorageQueueInfo ( ssi . id ( ) , ssi . locality ) ) ) ;
state Map < UID , StorageQueueInfo > : : iterator myQueueInfo = self - > storageQueueInfo . find ( ssi . id ( ) ) ;
TraceEvent ( " RkTracking " , ssi . id ( ) ) ;
try {
loop {
ErrorOr < StorageQueuingMetricsReply > reply = wait ( ssi . getQueuingMetrics . getReplyUnlessFailedFor ( StorageQueuingMetricsRequest ( ) , 0 , 0 ) ) ; // SOMEDAY: or tryGetReply?
if ( reply . present ( ) ) {
myQueueInfo - > value . valid = true ;
myQueueInfo - > value . prevReply = myQueueInfo - > value . lastReply ;
myQueueInfo - > value . lastReply = reply . get ( ) ;
if ( myQueueInfo - > value . prevReply . instanceID ! = reply . get ( ) . instanceID ) {
myQueueInfo - > value . smoothDurableBytes . reset ( reply . get ( ) . bytesDurable ) ;
myQueueInfo - > value . verySmoothDurableBytes . reset ( reply . get ( ) . bytesDurable ) ;
myQueueInfo - > value . smoothInputBytes . reset ( reply . get ( ) . bytesInput ) ;
myQueueInfo - > value . smoothFreeSpace . reset ( reply . get ( ) . storageBytes . available ) ;
myQueueInfo - > value . smoothTotalSpace . reset ( reply . get ( ) . storageBytes . total ) ;
} else {
self - > smoothTotalDurableBytes . addDelta ( reply . get ( ) . bytesDurable - myQueueInfo - > value . prevReply . bytesDurable ) ;
myQueueInfo - > value . smoothDurableBytes . setTotal ( reply . get ( ) . bytesDurable ) ;
myQueueInfo - > value . verySmoothDurableBytes . setTotal ( reply . get ( ) . bytesDurable ) ;
myQueueInfo - > value . smoothInputBytes . setTotal ( reply . get ( ) . bytesInput ) ;
myQueueInfo - > value . smoothFreeSpace . setTotal ( reply . get ( ) . storageBytes . available ) ;
myQueueInfo - > value . smoothTotalSpace . setTotal ( reply . get ( ) . storageBytes . total ) ;
}
} else {
2018-01-19 06:51:38 +08:00
if ( myQueueInfo - > value . valid ) {
TraceEvent ( " RkStorageServerDidNotRespond " , ssi . id ( ) ) ;
}
2017-05-26 04:48:44 +08:00
myQueueInfo - > value . valid = false ;
}
2018-08-11 04:57:10 +08:00
wait ( delayJittered ( SERVER_KNOBS - > METRIC_UPDATE_RATE ) & & IFailureMonitor : : failureMonitor ( ) . onStateEqual ( ssi . getQueuingMetrics . getEndpoint ( ) , FailureStatus ( false ) ) ) ;
2017-05-26 04:48:44 +08:00
}
} catch ( . . . ) {
// including cancellation
self - > storageQueueInfo . erase ( myQueueInfo ) ;
throw ;
}
}
ACTOR Future < Void > trackTLogQueueInfo ( Ratekeeper * self , TLogInterface tli ) {
self - > tlogQueueInfo . insert ( mapPair ( tli . id ( ) , TLogQueueInfo ( tli . id ( ) ) ) ) ;
state Map < UID , TLogQueueInfo > : : iterator myQueueInfo = self - > tlogQueueInfo . find ( tli . id ( ) ) ;
TraceEvent ( " RkTracking " , tli . id ( ) ) ;
try {
loop {
ErrorOr < TLogQueuingMetricsReply > reply = wait ( tli . getQueuingMetrics . getReplyUnlessFailedFor ( TLogQueuingMetricsRequest ( ) , 0 , 0 ) ) ; // SOMEDAY: or tryGetReply?
if ( reply . present ( ) ) {
myQueueInfo - > value . valid = true ;
myQueueInfo - > value . prevReply = myQueueInfo - > value . lastReply ;
myQueueInfo - > value . lastReply = reply . get ( ) ;
if ( myQueueInfo - > value . prevReply . instanceID ! = reply . get ( ) . instanceID ) {
myQueueInfo - > value . smoothDurableBytes . reset ( reply . get ( ) . bytesDurable ) ;
myQueueInfo - > value . verySmoothDurableBytes . reset ( reply . get ( ) . bytesDurable ) ;
myQueueInfo - > value . smoothInputBytes . reset ( reply . get ( ) . bytesInput ) ;
myQueueInfo - > value . smoothFreeSpace . reset ( reply . get ( ) . storageBytes . available ) ;
myQueueInfo - > value . smoothTotalSpace . reset ( reply . get ( ) . storageBytes . total ) ;
} else {
self - > smoothTotalDurableBytes . addDelta ( reply . get ( ) . bytesDurable - myQueueInfo - > value . prevReply . bytesDurable ) ;
myQueueInfo - > value . smoothDurableBytes . setTotal ( reply . get ( ) . bytesDurable ) ;
myQueueInfo - > value . verySmoothDurableBytes . setTotal ( reply . get ( ) . bytesDurable ) ;
myQueueInfo - > value . smoothInputBytes . setTotal ( reply . get ( ) . bytesInput ) ;
myQueueInfo - > value . smoothFreeSpace . setTotal ( reply . get ( ) . storageBytes . available ) ;
myQueueInfo - > value . smoothTotalSpace . setTotal ( reply . get ( ) . storageBytes . total ) ;
}
} else {
2018-01-19 06:51:38 +08:00
if ( myQueueInfo - > value . valid ) {
TraceEvent ( " RkTLogDidNotRespond " , tli . id ( ) ) ;
}
2017-05-26 04:48:44 +08:00
myQueueInfo - > value . valid = false ;
}
2018-08-11 04:57:10 +08:00
wait ( delayJittered ( SERVER_KNOBS - > METRIC_UPDATE_RATE ) & & IFailureMonitor : : failureMonitor ( ) . onStateEqual ( tli . getQueuingMetrics . getEndpoint ( ) , FailureStatus ( false ) ) ) ;
2017-05-26 04:48:44 +08:00
}
} catch ( . . . ) {
// including cancellation
self - > tlogQueueInfo . erase ( myQueueInfo ) ;
throw ;
}
}
ACTOR Future < Void > splitError ( Future < Void > in , Promise < Void > errOut ) {
try {
2018-08-11 04:57:10 +08:00
wait ( in ) ;
2017-05-26 04:48:44 +08:00
return Void ( ) ;
} catch ( Error & e ) {
if ( e . code ( ) ! = error_code_actor_cancelled & & ! errOut . isSet ( ) )
errOut . sendError ( e ) ;
throw ;
}
}
ACTOR Future < Void > trackEachStorageServer (
Ratekeeper * self ,
FutureStream < std : : pair < UID , Optional < StorageServerInterface > > > serverChanges )
{
state Map < UID , Future < Void > > actors ;
state Promise < Void > err ;
loop choose {
when ( state std : : pair < UID , Optional < StorageServerInterface > > change = waitNext ( serverChanges ) ) {
2018-08-11 04:57:10 +08:00
wait ( delay ( 0 ) ) ; // prevent storageServerTracker from getting cancelled while on the call stack
2017-05-26 04:48:44 +08:00
if ( change . second . present ( ) ) {
auto & a = actors [ change . first ] ;
a = Future < Void > ( ) ;
a = splitError ( trackStorageServerQueueInfo ( self , change . second . get ( ) ) , err ) ;
} else
actors . erase ( change . first ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( err . getFuture ( ) ) ) { }
2017-05-26 04:48:44 +08:00
}
}
2019-02-28 02:31:56 +08:00
void updateRate ( Ratekeeper * self , RatekeeperLimits & limits ) {
2017-05-26 04:48:44 +08:00
//double controlFactor = ; // dt / eFoldingTime
2019-02-28 02:31:56 +08:00
double actualTps = self - > smoothReleasedTransactions . smoothRate ( ) ;
self - > actualTpsMetric = ( int64_t ) actualTps ;
2017-05-26 04:48:44 +08:00
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
2019-02-28 02:31:56 +08:00
actualTps = std : : max ( std : : max ( 1.0 , actualTps ) , self - > smoothTotalDurableBytes . smoothRate ( ) / CLIENT_KNOBS - > TRANSACTION_SIZE_LIMIT ) ;
2017-05-26 04:48:44 +08:00
2019-02-28 02:31:56 +08:00
limits . tpsLimit = std : : numeric_limits < double > : : infinity ( ) ;
2017-05-26 04:48:44 +08:00
UID reasonID = UID ( ) ;
limitReason_t limitReason = limitReason_t : : unlimited ;
int sscount = 0 ;
int64_t worstFreeSpaceStorageServer = std : : numeric_limits < int64_t > : : max ( ) ;
int64_t worstStorageQueueStorageServer = 0 ;
int64_t limitingStorageQueueStorageServer = 0 ;
2019-02-28 02:31:56 +08:00
std : : multimap < double , StorageQueueInfo * > storageTpsLimitReverseIndex ;
std : : map < UID , limitReason_t > ssReasons ;
2017-05-26 04:48:44 +08:00
// Look at each storage server's write queue, compute and store the desired rate ratio
for ( auto i = self - > storageQueueInfo . begin ( ) ; i ! = self - > storageQueueInfo . end ( ) ; + + i ) {
auto & ss = i - > value ;
if ( ! ss . valid ) continue ;
+ + sscount ;
2019-02-28 02:31:56 +08:00
limitReason_t ssLimitReason = limitReason_t : : unlimited ;
2017-05-26 04:48:44 +08:00
int64_t minFreeSpace = std : : max ( SERVER_KNOBS - > MIN_FREE_SPACE , ( int64_t ) ( SERVER_KNOBS - > MIN_FREE_SPACE_RATIO * ss . smoothTotalSpace . smoothTotal ( ) ) ) ;
worstFreeSpaceStorageServer = std : : min ( worstFreeSpaceStorageServer , ( int64_t ) ss . smoothFreeSpace . smoothTotal ( ) - minFreeSpace ) ;
2019-02-28 02:31:56 +08:00
int64_t springBytes = std : : max < int64_t > ( 1 , std : : min < int64_t > ( limits . storageSpringBytes , ( ss . smoothFreeSpace . smoothTotal ( ) - minFreeSpace ) * 0.2 ) ) ;
int64_t targetBytes = std : : max < int64_t > ( 1 , std : : min ( limits . storageTargetBytes , ( int64_t ) ss . smoothFreeSpace . smoothTotal ( ) - minFreeSpace ) ) ;
if ( targetBytes ! = limits . storageTargetBytes ) {
2017-05-26 04:48:44 +08:00
if ( minFreeSpace = = SERVER_KNOBS - > MIN_FREE_SPACE ) {
2019-02-28 02:31:56 +08:00
ssLimitReason = limitReason_t : : storage_server_min_free_space ;
2017-05-26 04:48:44 +08:00
} else {
2019-02-28 02:31:56 +08:00
ssLimitReason = limitReason_t : : storage_server_min_free_space_ratio ;
2017-05-26 04:48:44 +08:00
}
}
int64_t storageQueue = ss . lastReply . bytesInput - ss . smoothDurableBytes . smoothTotal ( ) ;
worstStorageQueueStorageServer = std : : max ( worstStorageQueueStorageServer , storageQueue ) ;
int64_t b = storageQueue - targetBytes ;
double targetRateRatio = std : : min ( ( b + springBytes ) / ( double ) springBytes , 2.0 ) ;
double inputRate = ss . smoothInputBytes . smoothRate ( ) ;
2019-02-28 02:31:56 +08:00
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
2017-05-26 04:48:44 +08:00
/*if( g_random->random01() < 0.1 ) {
2019-02-28 02:31:56 +08:00
std : : string name = " RateKeeperUpdateRate " + limits . context ;
TraceEvent ( name , ss . id )
2017-05-26 04:48:44 +08:00
. detail ( " MinFreeSpace " , minFreeSpace )
. detail ( " SpringBytes " , springBytes )
. detail ( " TargetBytes " , targetBytes )
. detail ( " SmoothTotalSpaceTotal " , ss . smoothTotalSpace . smoothTotal ( ) )
. detail ( " SmoothFreeSpaceTotal " , ss . smoothFreeSpace . smoothTotal ( ) )
. detail ( " LastReplyBytesInput " , ss . lastReply . bytesInput )
. detail ( " SmoothDurableBytesTotal " , ss . smoothDurableBytes . smoothTotal ( ) )
. detail ( " TargetRateRatio " , targetRateRatio )
. detail ( " SmoothInputBytesRate " , ss . smoothInputBytes . smoothRate ( ) )
2019-02-28 02:31:56 +08:00
. detail ( " ActualTPS " , actualTps )
2017-05-26 04:48:44 +08:00
. detail ( " InputRate " , inputRate )
. detail ( " VerySmoothDurableBytesRate " , ss . verySmoothDurableBytes . smoothRate ( ) )
2018-06-09 02:11:08 +08:00
. detail ( " B " , b ) ;
2017-05-26 04:48:44 +08:00
} */
// Don't let any storage server use up its target bytes faster than its MVCC window!
double maxBytesPerSecond = ( targetBytes - springBytes ) / ( ( ( ( double ) SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) / SERVER_KNOBS - > VERSIONS_PER_SECOND ) + 2.0 ) ;
2019-02-28 02:31:56 +08:00
double limitTps = std : : min ( actualTps * maxBytesPerSecond / std : : max ( 1.0e-8 , inputRate ) , maxBytesPerSecond * SERVER_KNOBS - > MAX_TRANSACTIONS_PER_BYTE ) ;
if ( ssLimitReason = = limitReason_t : : unlimited )
ssLimitReason = limitReason_t : : storage_server_write_bandwidth_mvcc ;
2017-05-26 04:48:44 +08:00
if ( targetRateRatio > 0 & & inputRate > 0 ) {
ASSERT ( inputRate ! = 0 ) ;
2019-02-28 02:31:56 +08:00
double smoothedRate = std : : max ( ss . verySmoothDurableBytes . smoothRate ( ) , actualTps / SERVER_KNOBS - > MAX_TRANSACTIONS_PER_BYTE ) ;
2017-05-26 04:48:44 +08:00
double x = smoothedRate / ( inputRate * targetRateRatio ) ;
2019-02-28 02:31:56 +08:00
double lim = actualTps * x ;
if ( lim < limitTps ) {
limitTps = lim ;
if ( ssLimitReason = = limitReason_t : : unlimited | | ssLimitReason = = limitReason_t : : storage_server_write_bandwidth_mvcc )
ssLimitReason = limitReason_t : : storage_server_write_queue_size ;
2017-05-26 04:48:44 +08:00
}
}
2019-02-28 02:31:56 +08:00
storageTpsLimitReverseIndex . insert ( std : : make_pair ( limitTps , & ss ) ) ;
2017-05-26 04:48:44 +08:00
2019-02-28 02:31:56 +08:00
if ( limitTps < limits . tpsLimit & & ( ssLimitReason = = limitReason_t : : storage_server_min_free_space | | ssLimitReason = = limitReason_t : : storage_server_min_free_space_ratio ) ) {
2017-05-26 04:48:44 +08:00
reasonID = ss . id ;
2019-02-28 02:31:56 +08:00
limits . tpsLimit = limitTps ;
limitReason = ssLimitReason ;
2017-05-26 04:48:44 +08:00
}
2019-02-28 02:31:56 +08:00
ssReasons [ ss . id ] = ssLimitReason ;
2017-05-26 04:48:44 +08:00
}
std : : set < Optional < Standalone < StringRef > > > ignoredMachines ;
2019-02-28 02:31:56 +08:00
for ( auto ss = storageTpsLimitReverseIndex . begin ( ) ; ss ! = storageTpsLimitReverseIndex . end ( ) & & ss - > first < limits . tpsLimit ; + + ss ) {
2017-05-26 04:48:44 +08:00
if ( ignoredMachines . size ( ) < std : : min ( self - > configuration . storageTeamSize - 1 , SERVER_KNOBS - > MAX_MACHINES_FALLING_BEHIND ) ) {
ignoredMachines . insert ( ss - > second - > locality . zoneId ( ) ) ;
continue ;
}
if ( ignoredMachines . count ( ss - > second - > locality . zoneId ( ) ) > 0 ) {
continue ;
}
limitingStorageQueueStorageServer = ss - > second - > lastReply . bytesInput - ss - > second - > smoothDurableBytes . smoothTotal ( ) ;
2019-02-28 02:31:56 +08:00
limits . tpsLimit = ss - > first ;
limitReason = ssReasons [ storageTpsLimitReverseIndex . begin ( ) - > second - > id ] ;
reasonID = storageTpsLimitReverseIndex . begin ( ) - > second - > id ; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
2017-05-26 04:48:44 +08:00
break ;
}
double writeToReadLatencyLimit = 0 ;
Version worstVersionLag = 0 ;
Version limitingVersionLag = 0 ;
{
Version minSSVer = std : : numeric_limits < Version > : : max ( ) ;
Version minLimitingSSVer = std : : numeric_limits < Version > : : max ( ) ;
for ( auto i = self - > storageQueueInfo . begin ( ) ; i ! = self - > storageQueueInfo . end ( ) ; + + i ) {
auto & ss = i - > value ;
if ( ! ss . valid ) continue ;
minSSVer = std : : min ( minSSVer , ss . lastReply . v ) ;
// Machines that ratekeeper isn't controlling can fall arbitrarily far behind
if ( ignoredMachines . count ( i - > value . locality . zoneId ( ) ) = = 0 ) {
minLimitingSSVer = std : : min ( minLimitingSSVer , ss . lastReply . v ) ;
}
}
Version maxTLVer = std : : numeric_limits < Version > : : min ( ) ;
for ( auto i = self - > tlogQueueInfo . begin ( ) ; i ! = self - > tlogQueueInfo . end ( ) ; + + i ) {
auto & tl = i - > value ;
if ( ! tl . valid ) continue ;
maxTLVer = std : : max ( maxTLVer , tl . lastReply . v ) ;
}
// writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed
2019-02-28 02:31:56 +08:00
writeToReadLatencyLimit = ( ( maxTLVer - minLimitingSSVer ) - limits . maxVersionDifference / 2 ) / ( limits . maxVersionDifference / 4 ) ;
2017-05-26 04:48:44 +08:00
worstVersionLag = std : : max ( ( Version ) 0 , maxTLVer - minSSVer ) ;
limitingVersionLag = std : : max ( ( Version ) 0 , maxTLVer - minLimitingSSVer ) ;
}
int64_t worstFreeSpaceTLog = std : : numeric_limits < int64_t > : : max ( ) ;
int64_t worstStorageQueueTLog = 0 ;
int tlcount = 0 ;
for ( auto i = self - > tlogQueueInfo . begin ( ) ; i ! = self - > tlogQueueInfo . end ( ) ; + + i ) {
auto & tl = i - > value ;
if ( ! tl . valid ) continue ;
+ + tlcount ;
limitReason_t tlogLimitReason = limitReason_t : : log_server_write_queue ;
int64_t minFreeSpace = std : : max ( SERVER_KNOBS - > MIN_FREE_SPACE , ( int64_t ) ( SERVER_KNOBS - > MIN_FREE_SPACE_RATIO * tl . smoothTotalSpace . smoothTotal ( ) ) ) ;
worstFreeSpaceTLog = std : : min ( worstFreeSpaceTLog , ( int64_t ) tl . smoothFreeSpace . smoothTotal ( ) - minFreeSpace ) ;
2019-02-28 02:31:56 +08:00
int64_t springBytes = std : : max < int64_t > ( 1 , std : : min < int64_t > ( limits . logSpringBytes , ( tl . smoothFreeSpace . smoothTotal ( ) - minFreeSpace ) * 0.2 ) ) ;
int64_t targetBytes = std : : max < int64_t > ( 1 , std : : min ( limits . logTargetBytes , ( int64_t ) tl . smoothFreeSpace . smoothTotal ( ) - minFreeSpace ) ) ;
if ( targetBytes ! = limits . logTargetBytes ) {
2017-05-26 04:48:44 +08:00
if ( minFreeSpace = = SERVER_KNOBS - > MIN_FREE_SPACE ) {
tlogLimitReason = limitReason_t : : log_server_min_free_space ;
} else {
tlogLimitReason = limitReason_t : : log_server_min_free_space_ratio ;
}
}
int64_t queue = tl . lastReply . bytesInput - tl . smoothDurableBytes . smoothTotal ( ) ;
int64_t b = queue - targetBytes ;
worstStorageQueueTLog = std : : max ( worstStorageQueueTLog , queue ) ;
if ( tl . lastReply . bytesInput - tl . lastReply . bytesDurable > tl . lastReply . storageBytes . free - minFreeSpace / 2 ) {
if ( now ( ) - self - > lastWarning > 5.0 ) {
self - > lastWarning = now ( ) ;
2018-06-09 02:11:08 +08:00
TraceEvent ( SevWarnAlways , " RkTlogMinFreeSpaceZero " ) . detail ( " ReasonId " , tl . id ) ;
2017-05-26 04:48:44 +08:00
}
reasonID = tl . id ;
limitReason = limitReason_t : : log_server_min_free_space ;
2019-02-28 02:31:56 +08:00
limits . tpsLimit = 0.0 ;
2017-05-26 04:48:44 +08:00
}
double targetRateRatio = std : : min ( ( b + springBytes ) / ( double ) springBytes , 2.0 ) ;
if ( writeToReadLatencyLimit > targetRateRatio ) {
targetRateRatio = writeToReadLatencyLimit ;
tlogLimitReason = limitReason_t : : storage_server_readable_behind ;
}
double inputRate = tl . smoothInputBytes . smoothRate ( ) ;
if ( targetRateRatio > 0 ) {
2019-02-28 02:31:56 +08:00
double smoothedRate = std : : max ( tl . verySmoothDurableBytes . smoothRate ( ) , actualTps / SERVER_KNOBS - > MAX_TRANSACTIONS_PER_BYTE ) ;
2017-05-26 04:48:44 +08:00
double x = smoothedRate / ( inputRate * targetRateRatio ) ;
if ( targetRateRatio < .75 ) //< FIXME: KNOB for 2.0
x = std : : max ( x , 0.95 ) ;
2019-02-28 02:31:56 +08:00
double lim = actualTps * x ;
if ( lim < limits . tpsLimit ) {
limits . tpsLimit = lim ;
2017-05-26 04:48:44 +08:00
reasonID = tl . id ;
limitReason = tlogLimitReason ;
}
}
if ( inputRate > 0 ) {
// Don't let any tlogs use up its target bytes faster than its MVCC window!
double x = ( ( targetBytes - springBytes ) / ( ( ( ( double ) SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) / SERVER_KNOBS - > VERSIONS_PER_SECOND ) + 2.0 ) ) / inputRate ;
2019-02-28 02:31:56 +08:00
double lim = actualTps * x ;
if ( lim < limits . tpsLimit ) {
limits . tpsLimit = lim ;
2017-05-26 04:48:44 +08:00
reasonID = tl . id ;
limitReason = limitReason_t : : log_server_mvcc_write_bandwidth ;
}
}
}
2019-02-28 02:31:56 +08:00
limits . tpsLimit = std : : max ( limits . tpsLimit , 0.0 ) ;
2017-05-26 04:48:44 +08:00
if ( g_network - > isSimulated ( ) & & g_simulator . speedUpSimulation ) {
2019-02-28 02:31:56 +08:00
limits . tpsLimit = std : : max ( limits . tpsLimit , 100.0 ) ;
2017-05-26 04:48:44 +08:00
}
int64_t totalDiskUsageBytes = 0 ;
for ( auto & t : self - > tlogQueueInfo )
if ( t . value . valid )
totalDiskUsageBytes + = t . value . lastReply . storageBytes . used ;
for ( auto & s : self - > storageQueueInfo )
if ( s . value . valid )
totalDiskUsageBytes + = s . value . lastReply . storageBytes . used ;
2019-02-28 02:31:56 +08:00
limits . tpsLimitMetric = std : : min ( limits . tpsLimit , 1e6 ) ;
limits . reasonMetric = limitReason ;
2017-05-26 04:48:44 +08:00
2019-02-28 02:31:56 +08:00
if ( self - > smoothReleasedTransactions . smoothRate ( ) > SERVER_KNOBS - > LAST_LIMITED_RATIO * limits . tpsLimit ) {
2017-05-26 04:48:44 +08:00
( * self - > lastLimited ) = now ( ) ;
}
2019-02-28 02:31:56 +08:00
if ( g_random - > random01 ( ) < 0.1 ) {
std : : string name = " RkUpdate " + limits . context ;
TraceEvent ( name . c_str ( ) )
. detail ( " TPSLimit " , limits . tpsLimit )
2017-05-26 04:48:44 +08:00
. detail ( " Reason " , limitReason )
. detail ( " ReasonServerID " , reasonID )
. detail ( " ReleasedTPS " , self - > smoothReleasedTransactions . smoothRate ( ) )
2019-02-28 02:31:56 +08:00
. detail ( " TPSBasis " , actualTps )
2017-05-26 04:48:44 +08:00
. detail ( " StorageServers " , sscount )
. detail ( " Proxies " , self - > proxy_transactionCountAndTime . size ( ) )
. detail ( " TLogs " , tlcount )
. detail ( " WorstFreeSpaceStorageServer " , worstFreeSpaceStorageServer )
. detail ( " WorstFreeSpaceTLog " , worstFreeSpaceTLog )
. detail ( " WorstStorageServerQueue " , worstStorageQueueStorageServer )
. detail ( " LimitingStorageServerQueue " , limitingStorageQueueStorageServer )
. detail ( " WorstTLogQueue " , worstStorageQueueTLog )
. detail ( " TotalDiskUsageBytes " , totalDiskUsageBytes )
. detail ( " WorstStorageServerVersionLag " , worstVersionLag )
. detail ( " LimitingStorageServerVersionLag " , limitingVersionLag )
2019-03-01 01:53:16 +08:00
. trackLatest ( name . c_str ( ) ) ;
2017-05-26 04:48:44 +08:00
}
}
2019-02-20 08:04:52 +08:00
ACTOR Future < Void > configurationMonitor ( Ratekeeper * self , Reference < AsyncVar < ServerDBInfo > > dbInfo ) {
state Database cx = openDBOnServer ( dbInfo , TaskDefaultEndpoint , true , true ) ;
loop {
state ReadYourWritesTransaction tr ( cx ) ;
loop {
try {
tr . setOption ( FDBTransactionOptions : : ACCESS_SYSTEM_KEYS ) ;
tr . setOption ( FDBTransactionOptions : : PRIORITY_SYSTEM_IMMEDIATE ) ;
Standalone < RangeResultRef > results = wait ( tr . getRange ( configKeys , CLIENT_KNOBS - > TOO_MANY ) ) ;
ASSERT ( ! results . more & & results . size ( ) < CLIENT_KNOBS - > TOO_MANY ) ;
self - > configuration . fromKeyValues ( ( VectorRef < KeyValueRef > ) results ) ;
state Future < Void > watchFuture = tr . watch ( moveKeysLockOwnerKey ) ;
wait ( tr . commit ( ) ) ;
wait ( watchFuture ) ;
break ;
} catch ( Error & e ) {
wait ( tr . onError ( e ) ) ;
}
}
}
}
2017-05-26 04:48:44 +08:00
ACTOR Future < Void > rateKeeper (
Reference < AsyncVar < ServerDBInfo > > dbInfo ,
PromiseStream < std : : pair < UID , Optional < StorageServerInterface > > > serverChanges ,
FutureStream < struct GetRateInfoRequest > getRateInfo ,
double * lastLimited )
{
state Ratekeeper self ;
state Future < Void > track = trackEachStorageServer ( & self , serverChanges . getFuture ( ) ) ;
state Future < Void > timeout = Void ( ) ;
state std : : vector < Future < Void > > actors ;
state std : : vector < Future < Void > > tlogTrackers ;
state std : : vector < TLogInterface > tlogInterfs ;
state Promise < Void > err ;
2019-02-20 08:04:52 +08:00
state Future < Void > configMonitor = configurationMonitor ( & self , dbInfo ) ;
2017-05-26 04:48:44 +08:00
self . lastLimited = lastLimited ;
TraceEvent ( " RkTLogQueueSizeParameters " ) . detail ( " Target " , SERVER_KNOBS - > TARGET_BYTES_PER_TLOG ) . detail ( " Spring " , SERVER_KNOBS - > SPRING_BYTES_TLOG )
. detail ( " Rate " , ( SERVER_KNOBS - > TARGET_BYTES_PER_TLOG - SERVER_KNOBS - > SPRING_BYTES_TLOG ) / ( ( ( ( double ) SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) / SERVER_KNOBS - > VERSIONS_PER_SECOND ) + 2.0 ) ) ;
TraceEvent ( " RkStorageServerQueueSizeParameters " ) . detail ( " Target " , SERVER_KNOBS - > TARGET_BYTES_PER_STORAGE_SERVER ) . detail ( " Spring " , SERVER_KNOBS - > SPRING_BYTES_STORAGE_SERVER ) . detail ( " EBrake " , SERVER_KNOBS - > STORAGE_HARD_LIMIT_BYTES )
. detail ( " Rate " , ( SERVER_KNOBS - > TARGET_BYTES_PER_STORAGE_SERVER - SERVER_KNOBS - > SPRING_BYTES_STORAGE_SERVER ) / ( ( ( ( double ) SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) / SERVER_KNOBS - > VERSIONS_PER_SECOND ) + 2.0 ) ) ;
2018-05-30 01:51:23 +08:00
tlogInterfs = dbInfo - > get ( ) . logSystemConfig . allLocalLogs ( ) ;
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < tlogInterfs . size ( ) ; i + + )
tlogTrackers . push_back ( splitError ( trackTLogQueueInfo ( & self , tlogInterfs [ i ] ) , err ) ) ;
loop {
choose {
2018-08-11 04:57:10 +08:00
when ( wait ( track ) ) { break ; }
when ( wait ( timeout ) ) {
2019-02-28 02:31:56 +08:00
updateRate ( & self , self . normalLimits ) ;
updateRate ( & self , self . batchLimits ) ;
if ( self . smoothReleasedTransactions . smoothRate ( ) > SERVER_KNOBS - > LAST_LIMITED_RATIO * self . normalLimits . tpsLimit ) {
* self . lastLimited = now ( ) ;
}
2017-05-26 04:48:44 +08:00
double tooOld = now ( ) - 1.0 ;
for ( auto p = self . proxy_transactionCountAndTime . begin ( ) ; p ! = self . proxy_transactionCountAndTime . end ( ) ; ) {
if ( p - > second . second < tooOld )
p = self . proxy_transactionCountAndTime . erase ( p ) ;
else
+ + p ;
}
timeout = delayJittered ( SERVER_KNOBS - > METRIC_UPDATE_RATE ) ;
}
when ( GetRateInfoRequest req = waitNext ( getRateInfo ) ) {
GetRateInfoReply reply ;
auto & p = self . proxy_transactionCountAndTime [ req . requesterID ] ;
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.first).detail("Delta", req.totalReleasedTransactions - p.first);
if ( p . first > 0 )
self . smoothReleasedTransactions . addDelta ( req . totalReleasedTransactions - p . first ) ;
p . first = req . totalReleasedTransactions ;
p . second = now ( ) ;
2019-02-28 02:31:56 +08:00
reply . transactionRate = self . normalLimits . tpsLimit / self . proxy_transactionCountAndTime . size ( ) ;
reply . batchTransactionRate = self . batchLimits . tpsLimit / self . proxy_transactionCountAndTime . size ( ) ;
2017-05-26 04:48:44 +08:00
reply . leaseDuration = SERVER_KNOBS - > METRIC_UPDATE_RATE ;
req . reply . send ( reply ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( err . getFuture ( ) ) ) { }
when ( wait ( dbInfo - > onChange ( ) ) ) {
2018-05-30 01:51:23 +08:00
if ( tlogInterfs ! = dbInfo - > get ( ) . logSystemConfig . allLocalLogs ( ) ) {
tlogInterfs = dbInfo - > get ( ) . logSystemConfig . allLocalLogs ( ) ;
2017-05-26 04:48:44 +08:00
tlogTrackers = std : : vector < Future < Void > > ( ) ;
for ( int i = 0 ; i < tlogInterfs . size ( ) ; i + + )
tlogTrackers . push_back ( splitError ( trackTLogQueueInfo ( & self , tlogInterfs [ i ] ) , err ) ) ;
}
}
2019-02-20 08:04:52 +08:00
when ( wait ( configMonitor ) ) { }
2017-05-26 04:48:44 +08:00
}
}
return Void ( ) ;
}