2017-05-26 04:48:44 +08:00
/*
* StorageMetrics . actor . h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
// Included via StorageMetrics.h
# include "fdbclient/FDBTypes.h"
# include "fdbrpc/simulator.h"
# include "flow/UnitTest.h"
# include "fdbclient/StorageServerInterface.h"
# include "fdbclient/KeyRangeMap.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/Knobs.h"
2018-08-11 06:47:41 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
struct StorageMetricSample {
IndexedSet < Key , int64_t > sample ;
int64_t metricUnitsPerSample ;
StorageMetricSample ( int64_t metricUnitsPerSample ) : metricUnitsPerSample ( metricUnitsPerSample ) { }
int64_t getEstimate ( KeyRangeRef keys ) const {
return sample . sumRange ( keys . begin , keys . end ) ;
}
KeyRef splitEstimate ( KeyRangeRef range , int64_t offset , bool front = true ) const {
auto fwd_split = sample . index ( front ? sample . sumTo ( sample . lower_bound ( range . begin ) ) + offset : sample . sumTo ( sample . lower_bound ( range . end ) ) - offset ) ;
if ( fwd_split = = sample . end ( ) | | * fwd_split > = range . end )
return range . end ;
if ( ! front & & * fwd_split < = range . begin )
return range . begin ;
auto bck_split = fwd_split ;
// Butterfly search - start at midpoint then go in both directions.
while ( ( fwd_split ! = sample . end ( ) & & * fwd_split < range . end ) | |
( bck_split ! = sample . begin ( ) & & * bck_split > range . begin ) ) {
if ( bck_split ! = sample . begin ( ) & & * bck_split > range . begin ) {
auto it = bck_split ;
bck_split . decrementNonEnd ( ) ;
KeyRef split = keyBetween ( KeyRangeRef ( bck_split ! = sample . begin ( ) ? std : : max < KeyRef > ( * bck_split , range . begin ) : range . begin , * it ) ) ;
2017-07-28 03:03:36 +08:00
if ( ! front | | ( getEstimate ( KeyRangeRef ( range . begin , split ) ) > 0 & & split . size ( ) < = CLIENT_KNOBS - > SPLIT_KEY_SIZE_LIMIT ) )
2017-05-26 04:48:44 +08:00
return split ;
}
if ( fwd_split ! = sample . end ( ) & & * fwd_split < range . end ) {
auto it = fwd_split ;
+ + it ;
KeyRef split = keyBetween ( KeyRangeRef ( * fwd_split , it ! = sample . end ( ) ? std : : min < KeyRef > ( * it , range . end ) : range . end ) ) ;
2017-07-28 03:03:36 +08:00
if ( front | | ( getEstimate ( KeyRangeRef ( split , range . end ) ) > 0 & & split . size ( ) < = CLIENT_KNOBS - > SPLIT_KEY_SIZE_LIMIT ) )
2017-05-26 04:48:44 +08:00
return split ;
fwd_split = it ;
}
}
// If we didn't return above, we didn't find anything.
2019-03-19 06:03:43 +08:00
TraceEvent ( SevWarn , " CannotSplitLastSampleKey " ) . detail ( " Range " , range ) . detail ( " Offset " , offset ) ;
2017-05-26 04:48:44 +08:00
return front ? range . end : range . begin ;
}
} ;
2018-10-06 13:09:58 +08:00
TEST_CASE ( " /fdbserver/StorageMetricSample/simple " ) {
2017-05-26 04:48:44 +08:00
StorageMetricSample s ( 1000 ) ;
s . sample . insert ( LiteralStringRef ( " Apple " ) , 1000 ) ;
s . sample . insert ( LiteralStringRef ( " Banana " ) , 2000 ) ;
s . sample . insert ( LiteralStringRef ( " Cat " ) , 1000 ) ;
s . sample . insert ( LiteralStringRef ( " Cathode " ) , 1000 ) ;
s . sample . insert ( LiteralStringRef ( " Dog " ) , 1000 ) ;
ASSERT ( s . getEstimate ( KeyRangeRef ( LiteralStringRef ( " A " ) , LiteralStringRef ( " D " ) ) ) = = 5000 ) ;
ASSERT ( s . getEstimate ( KeyRangeRef ( LiteralStringRef ( " A " ) , LiteralStringRef ( " E " ) ) ) = = 6000 ) ;
ASSERT ( s . getEstimate ( KeyRangeRef ( LiteralStringRef ( " B " ) , LiteralStringRef ( " C " ) ) ) = = 2000 ) ;
//ASSERT(s.splitEstimate(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("D")), 3500) == LiteralStringRef("Cat"));
return Void ( ) ;
}
struct TransientStorageMetricSample : StorageMetricSample {
Deque < std : : pair < double , std : : pair < Key , int64_t > > > queue ;
TransientStorageMetricSample ( int64_t metricUnitsPerSample ) : StorageMetricSample ( metricUnitsPerSample ) { }
bool roll ( KeyRef key , int64_t metric ) {
2019-05-11 05:01:52 +08:00
return deterministicRandom ( ) - > random01 ( ) < ( double ) metric / metricUnitsPerSample ; //< SOMEDAY: Better randomInt64?
2017-05-26 04:48:44 +08:00
}
// Returns the sampled metric value (possibly 0, possibly increased by the sampling factor)
int64_t addAndExpire ( KeyRef key , int64_t metric , double expiration ) {
int64_t x = add ( key , metric ) ;
if ( x )
queue . push_back ( std : : make_pair ( expiration , std : : make_pair ( * sample . find ( key ) , - x ) ) ) ;
return x ;
}
//FIXME: both versions of erase are broken, because they do not remove items in the queue with will subtract a metric from the value sometime in the future
int64_t erase ( KeyRef key ) {
auto it = sample . find ( key ) ;
if ( it = = sample . end ( ) ) return 0 ;
int64_t x = sample . getMetric ( it ) ;
sample . erase ( it ) ;
return x ;
}
void erase ( KeyRangeRef keys ) {
sample . erase ( keys . begin , keys . end ) ;
}
void poll ( KeyRangeMap < vector < PromiseStream < StorageMetrics > > > & waitMap , StorageMetrics m ) {
double now = : : now ( ) ;
while ( queue . size ( ) & &
queue . front ( ) . first < = now )
{
KeyRef key = queue . front ( ) . second . first ;
int64_t delta = queue . front ( ) . second . second ;
ASSERT ( delta ! = 0 ) ;
2017-10-12 08:35:10 +08:00
if ( sample . addMetric ( key , delta ) = = 0 )
2017-05-26 04:48:44 +08:00
sample . erase ( key ) ;
StorageMetrics deltaM = m * delta ;
auto v = waitMap [ key ] ;
for ( int i = 0 ; i < v . size ( ) ; i + + ) {
TEST ( true ) ; // TransientStorageMetricSample poll update
v [ i ] . send ( deltaM ) ;
}
queue . pop_front ( ) ;
}
}
void poll ( ) {
double now = : : now ( ) ;
while ( queue . size ( ) & &
queue . front ( ) . first < = now )
{
KeyRef key = queue . front ( ) . second . first ;
int64_t delta = queue . front ( ) . second . second ;
ASSERT ( delta ! = 0 ) ;
2017-10-12 08:35:10 +08:00
if ( sample . addMetric ( key , delta ) = = 0 )
2017-05-26 04:48:44 +08:00
sample . erase ( key ) ;
queue . pop_front ( ) ;
}
}
private :
int64_t add ( KeyRef key , int64_t metric ) {
if ( ! metric ) return 0 ;
int64_t mag = metric < 0 ? - metric : metric ;
if ( mag < metricUnitsPerSample ) {
if ( ! roll ( key , mag ) )
return 0 ;
metric = metric < 0 ? - metricUnitsPerSample : metricUnitsPerSample ;
}
2017-10-12 08:35:10 +08:00
if ( sample . addMetric ( key , metric ) = = 0 )
2017-05-26 04:48:44 +08:00
sample . erase ( key ) ;
return metric ;
}
} ;
struct StorageServerMetrics {
KeyRangeMap < vector < PromiseStream < StorageMetrics > > > waitMetricsMap ;
StorageMetricSample byteSample ;
2019-09-27 05:42:54 +08:00
TransientStorageMetricSample iopsSample ,
bandwidthSample ; // FIXME: iops and bandwidth calculations are not effectively tested, since they aren't
2019-08-30 05:44:16 +08:00
// currently used by data distribution
2019-09-27 05:42:54 +08:00
TransientStorageMetricSample bytesReadSample ;
2017-05-26 04:48:44 +08:00
StorageServerMetrics ( )
2019-08-30 05:44:16 +08:00
: byteSample ( 0 ) , iopsSample ( SERVER_KNOBS - > IOPS_UNITS_PER_SAMPLE ) ,
bandwidthSample ( SERVER_KNOBS - > BANDWIDTH_UNITS_PER_SAMPLE ) ,
bytesReadSample ( SERVER_KNOBS - > BYTES_READ_UNITS_PER_SAMPLE ) { }
2017-05-26 04:48:44 +08:00
// Get the current estimated metrics for the given keys
StorageMetrics getMetrics ( KeyRangeRef const & keys ) {
StorageMetrics result ;
result . bytes = byteSample . getEstimate ( keys ) ;
result . bytesPerKSecond = bandwidthSample . getEstimate ( keys ) * SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ;
result . iosPerKSecond = iopsSample . getEstimate ( keys ) * SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ;
2019-08-30 05:44:16 +08:00
result . bytesReadPerKSecond =
bytesReadSample . getEstimate ( keys ) * SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ;
2017-05-26 04:48:44 +08:00
return result ;
}
// Called when metrics should change (IO for a given key)
// Notifies waiting WaitMetricsRequests through waitMetricsMap, and updates metricsAverageQueue and metricsSampleMap
void notify ( KeyRef key , StorageMetrics & metrics ) {
ASSERT ( metrics . bytes = = 0 ) ; // ShardNotifyMetrics
2019-11-08 02:38:48 +08:00
if ( g_network - > isSimulated ( ) ) {
2019-11-23 02:11:45 +08:00
TEST ( metrics . bytesPerKSecond ! = 0 ) ; // ShardNotifyMetrics
TEST ( metrics . iosPerKSecond ! = 0 ) ; // ShardNotifyMetrics
2019-11-08 02:38:48 +08:00
TEST ( metrics . bytesReadPerKSecond ! = 0 ) ; // ShardNotifyMetrics
}
2017-05-26 04:48:44 +08:00
double expire = now ( ) + SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL ;
StorageMetrics notifyMetrics ;
if ( metrics . bytesPerKSecond )
notifyMetrics . bytesPerKSecond = bandwidthSample . addAndExpire ( key , metrics . bytesPerKSecond , expire ) * SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ;
if ( metrics . iosPerKSecond )
notifyMetrics . iosPerKSecond = iopsSample . addAndExpire ( key , metrics . iosPerKSecond , expire ) * SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ;
2019-08-30 05:44:16 +08:00
if ( metrics . bytesReadPerKSecond )
notifyMetrics . bytesReadPerKSecond = bytesReadSample . addAndExpire ( key , metrics . bytesReadPerKSecond , expire ) *
SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ;
2017-05-26 04:48:44 +08:00
if ( ! notifyMetrics . allZero ( ) ) {
auto & v = waitMetricsMap [ key ] ;
for ( int i = 0 ; i < v . size ( ) ; i + + ) {
2019-11-22 07:51:42 +08:00
if ( g_network - > isSimulated ( ) ) {
TEST ( true ) ;
}
2019-11-08 02:38:48 +08:00
// ShardNotifyMetrics
2017-05-26 04:48:44 +08:00
v [ i ] . send ( notifyMetrics ) ;
}
}
}
2019-11-28 11:26:22 +08:00
// Due to the fact that read sampling will be called on all reads, use this specialized function to avoid overhead
// around branch misses and unnecessary stack allocation which eventually addes up under heavy load.
void notifyBytesReadPerKSecond ( KeyRef key , int64_t in ) {
double expire = now ( ) + SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL ;
int64_t bytesReadPerKSecond =
bytesReadSample . addAndExpire ( key , in , expire ) * SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ;
if ( bytesReadPerKSecond > 0 ) {
StorageMetrics notifyMetrics ;
notifyMetrics . bytesReadPerKSecond = bytesReadPerKSecond ;
auto & v = waitMetricsMap [ key ] ;
for ( int i = 0 ; i < v . size ( ) ; i + + ) {
TEST ( true ) ; // ShardNotifyMetrics
v [ i ] . send ( notifyMetrics ) ;
}
}
}
2017-05-26 04:48:44 +08:00
// Called by StorageServerDisk when the size of a key in byteSample changes, to notify WaitMetricsRequest
// Should not be called for keys past allKeys.end
void notifyBytes ( RangeMap < Key , std : : vector < PromiseStream < StorageMetrics > > , KeyRangeRef > : : Iterator shard , int64_t bytes ) {
ASSERT ( shard . end ( ) < = allKeys . end ) ;
StorageMetrics notifyMetrics ;
notifyMetrics . bytes = bytes ;
for ( int i = 0 ; i < shard . value ( ) . size ( ) ; i + + ) {
TEST ( true ) ; // notifyBytes
shard . value ( ) [ i ] . send ( notifyMetrics ) ;
}
}
// Called by StorageServerDisk when the size of a key in byteSample changes, to notify WaitMetricsRequest
void notifyBytes ( KeyRef key , int64_t bytes ) {
if ( key > = allKeys . end ) //Do not notify on changes to internal storage server state
return ;
notifyBytes ( waitMetricsMap . rangeContaining ( key ) , bytes ) ;
}
// Called when a range of keys becomes unassigned (and therefore not readable), to notify waiting WaitMetricsRequests (also other types of wait
// requests in the future?)
void notifyNotReadable ( KeyRangeRef keys ) {
auto rs = waitMetricsMap . intersectingRanges ( keys ) ;
for ( auto r = rs . begin ( ) ; r ! = rs . end ( ) ; + + r ) {
auto & v = r - > value ( ) ;
TEST ( v . size ( ) ) ; // notifyNotReadable() sending errors to intersecting ranges
for ( int n = 0 ; n < v . size ( ) ; n + + )
v [ n ] . sendError ( wrong_shard_server ( ) ) ;
}
}
// Called periodically (~1 sec intervals) to remove older IOs from the averages
// Removes old entries from metricsAverageQueue, updates metricsSampleMap accordingly, and notifies
// WaitMetricsRequests through waitMetricsMap.
void poll ( ) {
{ StorageMetrics m ; m . bytesPerKSecond = SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ; bandwidthSample . poll ( waitMetricsMap , m ) ; }
{ StorageMetrics m ; m . iosPerKSecond = SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ; iopsSample . poll ( waitMetricsMap , m ) ; }
2019-08-30 05:44:16 +08:00
{
StorageMetrics m ;
m . bytesReadPerKSecond = SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS ;
bytesReadSample . poll ( waitMetricsMap , m ) ;
}
2017-05-26 04:48:44 +08:00
// bytesSample doesn't need polling because we never call addExpire() on it
}
//static void waitMetrics( StorageServerMetrics* const& self, WaitMetricsRequest const& req );
2017-10-04 13:01:40 +08:00
// This function can run on untrusted user data. We must validate all divisions carefully.
2017-05-26 04:48:44 +08:00
KeyRef getSplitKey ( int64_t remaining , int64_t estimated , int64_t limits , int64_t used , int64_t infinity ,
2018-01-09 10:20:22 +08:00
bool isLastShard , StorageMetricSample & sample , double divisor , KeyRef const & lastKey , KeyRef const & key , bool hasUsed )
2017-11-14 03:38:34 +08:00
{
ASSERT ( remaining > = 0 ) ;
ASSERT ( limits > 0 ) ;
ASSERT ( divisor > 0 ) ;
2017-05-26 04:48:44 +08:00
if ( limits < infinity / 2 ) {
int64_t expectedSize ;
2017-10-04 13:01:40 +08:00
if ( isLastShard | | remaining > estimated ) {
double remaining_divisor = ( double ( remaining ) / limits ) + 0.5 ;
expectedSize = remaining / remaining_divisor ;
} else {
2017-11-14 03:38:34 +08:00
// If we are here, then estimated >= remaining >= 0
2017-10-04 13:01:40 +08:00
double estimated_divisor = ( double ( estimated ) / limits ) + 0.5 ;
expectedSize = remaining / estimated_divisor ;
}
2017-05-26 04:48:44 +08:00
if ( remaining > expectedSize ) {
// This does the conversion from native units to bytes using the divisor.
double offset = ( expectedSize - used ) / divisor ;
if ( offset < = 0 )
2018-01-09 10:20:22 +08:00
return hasUsed ? lastKey : key ;
2019-05-11 05:01:52 +08:00
return sample . splitEstimate ( KeyRangeRef ( lastKey , key ) , offset * ( ( 1.0 - SERVER_KNOBS - > SPLIT_JITTER_AMOUNT ) + 2 * deterministicRandom ( ) - > random01 ( ) * SERVER_KNOBS - > SPLIT_JITTER_AMOUNT ) ) ;
2017-05-26 04:48:44 +08:00
}
}
return key ;
}
void splitMetrics ( SplitMetricsRequest req ) {
try {
SplitMetricsReply reply ;
KeyRef lastKey = req . keys . begin ;
StorageMetrics used = req . used ;
StorageMetrics estimated = req . estimated ;
StorageMetrics remaining = getMetrics ( req . keys ) + used ;
2019-03-19 06:03:43 +08:00
//TraceEvent("SplitMetrics").detail("Begin", req.keys.begin).detail("End", req.keys.end).detail("Remaining", remaining.bytes).detail("Used", used.bytes);
2017-05-26 04:48:44 +08:00
while ( true ) {
if ( remaining . bytes < 2 * SERVER_KNOBS - > MIN_SHARD_BYTES )
break ;
KeyRef key = req . keys . end ;
2018-01-09 10:20:22 +08:00
bool hasUsed = used . bytes ! = 0 | | used . bytesPerKSecond ! = 0 | | used . iosPerKSecond ! = 0 ;
2017-05-26 04:48:44 +08:00
key = getSplitKey ( remaining . bytes , estimated . bytes , req . limits . bytes , used . bytes ,
2018-01-09 10:20:22 +08:00
req . limits . infinity , req . isLastShard , byteSample , 1 , lastKey , key , hasUsed ) ;
2017-05-26 04:48:44 +08:00
if ( used . bytes < SERVER_KNOBS - > MIN_SHARD_BYTES )
key = std : : max ( key , byteSample . splitEstimate ( KeyRangeRef ( lastKey , req . keys . end ) , SERVER_KNOBS - > MIN_SHARD_BYTES - used . bytes ) ) ;
key = getSplitKey ( remaining . iosPerKSecond , estimated . iosPerKSecond , req . limits . iosPerKSecond , used . iosPerKSecond ,
2018-01-09 10:20:22 +08:00
req . limits . infinity , req . isLastShard , iopsSample , SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS , lastKey , key , hasUsed ) ;
2017-05-26 04:48:44 +08:00
key = getSplitKey ( remaining . bytesPerKSecond , estimated . bytesPerKSecond , req . limits . bytesPerKSecond , used . bytesPerKSecond ,
2018-01-09 10:20:22 +08:00
req . limits . infinity , req . isLastShard , bandwidthSample , SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS , lastKey , key , hasUsed ) ;
ASSERT ( key ! = lastKey | | hasUsed ) ;
2017-05-26 04:48:44 +08:00
if ( key = = req . keys . end )
break ;
reply . splits . push_back_deep ( reply . splits . arena ( ) , key ) ;
StorageMetrics diff = ( getMetrics ( KeyRangeRef ( lastKey , key ) ) + used ) ;
remaining - = diff ;
estimated - = diff ;
used = StorageMetrics ( ) ;
lastKey = key ;
}
reply . used = getMetrics ( KeyRangeRef ( lastKey , req . keys . end ) ) + used ;
req . reply . send ( reply ) ;
} catch ( Error & e ) {
req . reply . sendError ( e ) ;
}
}
2020-04-04 10:04:43 +08:00
void getStorageMetrics ( GetStorageMetricsRequest req , StorageBytes sb , double bytesInputRate , int64_t versionLag , double lastUpdate ) {
2019-07-26 07:27:32 +08:00
GetStorageMetricsReply rep ;
2017-05-26 04:48:44 +08:00
// SOMEDAY: make bytes dynamic with hard disk space
rep . load = getMetrics ( allKeys ) ;
2019-08-15 04:57:22 +08:00
if ( sb . free < 1e9 ) {
2017-05-26 04:48:44 +08:00
TraceEvent ( SevWarn , " PhysicalDiskMetrics " )
2019-08-15 04:48:35 +08:00
. suppressFor ( 60.0 )
. detail ( " Free " , sb . free )
. detail ( " Total " , sb . total )
. detail ( " Available " , sb . available )
. detail ( " Load " , rep . load . bytes ) ;
2019-07-26 07:27:32 +08:00
}
2017-05-26 04:48:44 +08:00
2020-02-21 03:21:03 +08:00
rep . available . bytes = sb . available ;
rep . available . iosPerKSecond = 10e6 ;
rep . available . bytesPerKSecond = 100e9 ;
2020-02-22 11:09:16 +08:00
rep . available . bytesReadPerKSecond = 100e9 ;
2017-05-26 04:48:44 +08:00
rep . capacity . bytes = sb . total ;
rep . capacity . iosPerKSecond = 10e6 ;
rep . capacity . bytesPerKSecond = 100e9 ;
2019-08-30 05:44:16 +08:00
rep . capacity . bytesReadPerKSecond = 100e9 ;
2017-05-26 04:48:44 +08:00
2019-07-26 07:27:32 +08:00
rep . bytesInputRate = bytesInputRate ;
2020-04-04 10:04:43 +08:00
rep . versionLag = versionLag ;
rep . lastUpdate = lastUpdate ;
2020-02-04 10:14:26 +08:00
2017-05-26 04:48:44 +08:00
req . reply . send ( rep ) ;
}
Future < Void > waitMetrics ( WaitMetricsRequest req , Future < Void > delay ) ;
2020-04-22 02:00:19 +08:00
// Given a read hot shard, this function will divide the shard into chunks and find those chunks whose
2019-12-20 08:02:42 +08:00
// readBytes/sizeBytes exceeds the `readDensityRatio`. Please make sure to run unit tests
// `StorageMetricsSampleTests.txt` after change made.
2020-01-13 07:30:36 +08:00
std : : vector < KeyRangeRef > getReadHotRanges ( KeyRangeRef shard , double readDensityRatio , int64_t baseChunkSize ,
int64_t minShardReadBandwidthPerKSeconds ) {
2019-12-20 08:02:42 +08:00
std : : vector < KeyRangeRef > toReturn ;
2020-01-13 07:30:36 +08:00
double shardSize = ( double ) byteSample . getEstimate ( shard ) ;
int64_t shardReadBandwidth = bytesReadSample . getEstimate ( shard ) ;
if ( shardReadBandwidth * SERVER_KNOBS - > STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS < =
minShardReadBandwidthPerKSeconds ) {
return toReturn ;
}
2020-05-01 02:31:59 +08:00
if ( shardSize < = baseChunkSize ) {
2019-12-19 09:11:34 +08:00
// Shard is small, use it as is
2020-05-01 02:31:59 +08:00
if ( bytesReadSample . getEstimate ( shard ) > ( readDensityRatio * shardSize ) ) {
2020-01-13 07:30:36 +08:00
toReturn . push_back ( shard ) ;
}
2019-12-19 09:11:34 +08:00
return toReturn ;
}
KeyRef beginKey = shard . begin ;
IndexedSet < Key , int64_t > : : iterator endKey =
2020-05-01 02:31:59 +08:00
byteSample . sample . index ( byteSample . sample . sumTo ( byteSample . sample . lower_bound ( beginKey ) ) + baseChunkSize ) ;
2019-12-19 09:11:34 +08:00
while ( endKey ! = byteSample . sample . end ( ) ) {
2020-05-12 06:56:51 +08:00
if ( * endKey > shard . end ) {
endKey = byteSample . sample . lower_bound ( shard . end ) ;
if ( * endKey = = beginKey ) {
// No need to increment endKey since otherwise it would stuck here forever.
break ;
}
}
2019-12-19 09:11:34 +08:00
if ( * endKey = = beginKey ) {
2020-05-01 02:31:59 +08:00
+ + endKey ;
2019-12-19 09:11:34 +08:00
continue ;
}
2020-05-01 02:31:59 +08:00
if ( bytesReadSample . getEstimate ( KeyRangeRef ( beginKey , * endKey ) ) >
( readDensityRatio * std : : max ( baseChunkSize , byteSample . getEstimate ( KeyRangeRef ( beginKey , * endKey ) ) ) ) ) {
2019-12-19 09:11:34 +08:00
auto range = KeyRangeRef ( beginKey , * endKey ) ;
if ( ! toReturn . empty ( ) & & toReturn . back ( ) . end = = range . begin ) {
2020-04-22 02:00:19 +08:00
// in case two consecutive chunks both are over the ratio, merge them.
auto updatedTail = KeyRangeRef ( toReturn . back ( ) . begin , * endKey ) ;
2019-12-19 09:11:34 +08:00
toReturn . pop_back ( ) ;
toReturn . push_back ( updatedTail ) ;
} else {
toReturn . push_back ( range ) ;
}
}
beginKey = * endKey ;
2020-05-01 02:31:59 +08:00
endKey = byteSample . sample . index ( byteSample . sample . sumTo ( byteSample . sample . lower_bound ( beginKey ) ) +
baseChunkSize ) ;
2019-12-19 09:11:34 +08:00
}
return toReturn ;
}
2019-12-20 08:02:42 +08:00
void getReadHotRanges ( ReadHotSubRangeRequest req ) {
ReadHotSubRangeReply reply ;
std : : vector < KeyRangeRef > v = getReadHotRanges ( req . keys , SERVER_KNOBS - > SHARD_MAX_READ_DENSITY_RATIO ,
2020-01-13 07:30:36 +08:00
SERVER_KNOBS - > READ_HOT_SUB_RANGE_CHUNK_SIZE ,
SERVER_KNOBS - > SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS ) ;
2020-01-16 02:22:19 +08:00
reply . readHotRanges = VectorRef < KeyRangeRef > ( v . data ( ) , v . size ( ) ) ;
2019-12-20 08:02:42 +08:00
req . reply . send ( reply ) ;
}
2017-05-26 04:48:44 +08:00
private :
static void collapse ( KeyRangeMap < int > & map , KeyRef const & key ) {
auto range = map . rangeContaining ( key ) ;
if ( range = = map . ranges ( ) . begin ( ) | | range = = map . ranges ( ) . end ( ) ) return ;
int value = range - > value ( ) ;
auto prev = range ; - - prev ;
if ( prev - > value ( ) ! = value ) return ;
KeyRange keys = KeyRangeRef ( prev - > begin ( ) , range - > end ( ) ) ;
map . insert ( keys , value ) ;
}
static void add ( KeyRangeMap < int > & map , KeyRangeRef const & keys , int delta ) {
auto rs = map . modify ( keys ) ;
for ( auto r = rs . begin ( ) ; r ! = rs . end ( ) ; + + r )
r - > value ( ) + = delta ;
collapse ( map , keys . begin ) ;
collapse ( map , keys . end ) ;
}
} ;
2019-12-19 09:11:34 +08:00
TEST_CASE ( " /fdbserver/StorageMetricSample/readHotDetect/simple " ) {
2019-12-20 08:02:42 +08:00
int64_t sampleUnit = SERVER_KNOBS - > BYTES_READ_UNITS_PER_SAMPLE ;
StorageServerMetrics ssm ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Apple " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Banana " ) , 2000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Cat " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Cathode " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Dog " ) , 1000 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " A " ) , 20 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Absolute " ) , 80 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Apple " ) , 1000 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Bah " ) , 20 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Banana " ) , 80 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Bob " ) , 200 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " But " ) , 100 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Cat " ) , 300 * sampleUnit ) ;
vector < KeyRangeRef > t =
2020-01-13 07:30:36 +08:00
ssm . getReadHotRanges ( KeyRangeRef ( LiteralStringRef ( " A " ) , LiteralStringRef ( " C " ) ) , 2.0 , 200 * sampleUnit , 0 ) ;
2019-12-19 09:11:34 +08:00
ASSERT ( t . size ( ) = = 1 & & ( * t . begin ( ) ) . begin = = LiteralStringRef ( " Bah " ) & &
( * t . begin ( ) ) . end = = LiteralStringRef ( " Bob " ) ) ;
return Void ( ) ;
}
TEST_CASE ( " /fdbserver/StorageMetricSample/readHotDetect/moreThanOneRange " ) {
2019-12-20 08:02:42 +08:00
int64_t sampleUnit = SERVER_KNOBS - > BYTES_READ_UNITS_PER_SAMPLE ;
StorageServerMetrics ssm ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Apple " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Banana " ) , 2000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Cat " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Cathode " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Dog " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Final " ) , 2000 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " A " ) , 20 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Absolute " ) , 80 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Apple " ) , 1000 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Bah " ) , 20 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Banana " ) , 80 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Bob " ) , 200 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " But " ) , 100 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Cat " ) , 300 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Dah " ) , 300 * sampleUnit ) ;
vector < KeyRangeRef > t =
2020-01-13 07:30:36 +08:00
ssm . getReadHotRanges ( KeyRangeRef ( LiteralStringRef ( " A " ) , LiteralStringRef ( " D " ) ) , 2.0 , 200 * sampleUnit , 0 ) ;
2019-12-19 09:11:34 +08:00
ASSERT ( t . size ( ) = = 2 & & ( * t . begin ( ) ) . begin = = LiteralStringRef ( " Bah " ) & &
( * t . begin ( ) ) . end = = LiteralStringRef ( " Bob " ) ) ;
ASSERT ( t . at ( 1 ) . begin = = LiteralStringRef ( " Cat " ) & & t . at ( 1 ) . end = = LiteralStringRef ( " Dah " ) ) ;
return Void ( ) ;
}
TEST_CASE ( " /fdbserver/StorageMetricSample/readHotDetect/consecutiveRanges " ) {
2019-12-20 08:02:42 +08:00
int64_t sampleUnit = SERVER_KNOBS - > BYTES_READ_UNITS_PER_SAMPLE ;
StorageServerMetrics ssm ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Apple " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Banana " ) , 2000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Bucket " ) , 2000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Cat " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Cathode " ) , 1000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Dog " ) , 5000 * sampleUnit ) ;
ssm . bytesReadSample . sample . insert ( LiteralStringRef ( " Final " ) , 2000 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " A " ) , 20 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Absolute " ) , 80 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Apple " ) , 1000 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Bah " ) , 20 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Banana " ) , 80 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Bob " ) , 200 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " But " ) , 100 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Cat " ) , 300 * sampleUnit ) ;
ssm . byteSample . sample . insert ( LiteralStringRef ( " Dah " ) , 300 * sampleUnit ) ;
vector < KeyRangeRef > t =
2020-01-13 07:30:36 +08:00
ssm . getReadHotRanges ( KeyRangeRef ( LiteralStringRef ( " A " ) , LiteralStringRef ( " D " ) ) , 2.0 , 200 * sampleUnit , 0 ) ;
2019-12-19 09:11:34 +08:00
ASSERT ( t . size ( ) = = 2 & & ( * t . begin ( ) ) . begin = = LiteralStringRef ( " Bah " ) & &
( * t . begin ( ) ) . end = = LiteralStringRef ( " But " ) ) ;
ASSERT ( t . at ( 1 ) . begin = = LiteralStringRef ( " Cat " ) & & t . at ( 1 ) . end = = LiteralStringRef ( " Dah " ) ) ;
return Void ( ) ;
}
2017-05-26 04:48:44 +08:00
//Contains information about whether or not a key-value pair should be included in a byte sample
//Also contains size information about the byte sample
struct ByteSampleInfo {
bool inSample ;
//Actual size of the key value pair
int64_t size ;
//The recorded size of the sample (max of bytesPerSample, size)
int64_t sampledSize ;
} ;
//Determines whether a key-value pair should be included in a byte sample
//Also returns size information about the sample
ByteSampleInfo isKeyValueInSample ( KeyValueRef keyValue ) ;
2018-08-11 06:47:41 +08:00
# include "flow/unactorcompiler.h"