Added metrics for read hot key detection

This commit is contained in:
Xin Dong 2019-08-29 14:44:16 -07:00
parent c9097cca18
commit 12293d5497
5 changed files with 106 additions and 22 deletions

View File

@ -284,35 +284,40 @@ struct StorageMetrics {
int64_t bytes; // total storage
int64_t bytesPerKSecond; // network bandwidth (average over 10s)
int64_t iosPerKSecond;
int64_t bytesReadPerKSecond;
static const int64_t infinity = 1LL<<60;
StorageMetrics() : bytes(0), bytesPerKSecond(0), iosPerKSecond(0) {}
StorageMetrics() : bytes(0), bytesPerKSecond(0), iosPerKSecond(0), bytesReadPerKSecond(0) {}
bool allLessOrEqual( const StorageMetrics& rhs ) const {
return bytes <= rhs.bytes && bytesPerKSecond <= rhs.bytesPerKSecond && iosPerKSecond <= rhs.iosPerKSecond;
return bytes <= rhs.bytes && bytesPerKSecond <= rhs.bytesPerKSecond && iosPerKSecond <= rhs.iosPerKSecond &&
bytesReadPerKSecond <= rhs.bytesReadPerKSecond;
}
void operator += ( const StorageMetrics& rhs ) {
bytes += rhs.bytes;
bytesPerKSecond += rhs.bytesPerKSecond;
iosPerKSecond += rhs.iosPerKSecond;
bytesReadPerKSecond += rhs.bytesReadPerKSecond;
}
void operator -= ( const StorageMetrics& rhs ) {
bytes -= rhs.bytes;
bytesPerKSecond -= rhs.bytesPerKSecond;
iosPerKSecond -= rhs.iosPerKSecond;
bytesReadPerKSecond -= rhs.bytesReadPerKSecond;
}
template <class F>
void operator *= ( F f ) {
bytes *= f;
bytesPerKSecond *= f;
iosPerKSecond *= f;
bytesReadPerKSecond *= f;
}
bool allZero() const { return !bytes && !bytesPerKSecond && !iosPerKSecond; }
bool allZero() const { return !bytes && !bytesPerKSecond && !iosPerKSecond && !bytesReadPerKSecond; }
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, bytes, bytesPerKSecond, iosPerKSecond);
serializer(ar, bytes, bytesPerKSecond, iosPerKSecond, bytesReadPerKSecond);
}
void negate() { operator*=(-1.0); }
@ -322,11 +327,13 @@ struct StorageMetrics {
template <class F> StorageMetrics operator * ( F f ) const { StorageMetrics x(*this); x*=f; return x; }
bool operator == ( StorageMetrics const& rhs ) const {
return bytes == rhs.bytes && bytesPerKSecond == rhs.bytesPerKSecond && iosPerKSecond == rhs.iosPerKSecond;
return bytes == rhs.bytes && bytesPerKSecond == rhs.bytesPerKSecond && iosPerKSecond == rhs.iosPerKSecond &&
bytesReadPerKSecond == rhs.bytesReadPerKSecond;
}
std::string toString() const {
return format("Bytes: %lld, BPerKSec: %lld, iosPerKSec: %lld", bytes, bytesPerKSecond, iosPerKSecond);
return format("Bytes: %lld, BPerKSec: %lld, iosPerKSec: %lld, BReadPerKSec: %lld", bytes, bytesPerKSecond,
iosPerKSecond, bytesReadPerKSecond);
}
};

View File

@ -33,6 +33,8 @@ enum BandwidthStatus {
BandwidthStatusHigh
};
enum ReadBandwithStatus { ReadBandwithStatusNormal, ReadBandwithStatusHigh };
BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
if( metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC )
return BandwidthStatusHigh;
@ -42,6 +44,13 @@ BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
return BandwidthStatusNormal;
}
ReadBandwithStatus getReadBandwidthStatus(StorageMetrics const& metrics) {
if (metrics.bytesReadPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC)
return ReadBandwithStatusHigh;
else
return ReadBandwithStatusNormal;
}
ACTOR Future<Void> updateMaxShardSize( Reference<AsyncVar<int64_t>> dbSizeEstimate, Reference<AsyncVar<Optional<int64_t>>> maxShardSize ) {
state int64_t lastDbSize = 0;
state int64_t granularity = g_network->isSimulated() ?
@ -136,26 +145,35 @@ int64_t getMaxShardSize( double dbSizeEstimate ) {
(int64_t)SERVER_KNOBS->MAX_SHARD_BYTES);
}
<<<<<<< HEAD
ACTOR Future<Void> trackShardBytes(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize)
{
=======
ACTOR Future<Void> trackShardBytes(DataDistributionTracker* self, KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardMetrics,
bool addToSizeEstimate = true) {
>>>>>>> Added metrics for read hot key detection
wait( delay( 0, TaskPriority::DataDistribution ) );
/*TraceEvent("TrackShardBytesStarting")
.detail("TrackerID", trackerID)
.detail("Keys", keys)
.detail("TrackedBytesInitiallyPresent", shardSize->get().present())
.detail("StartingSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0)
.detail("StartingMerges", shardSize->get().present() ? shardSize->get().get().merges : 0);*/
.detail("TrackedBytesInitiallyPresent", shardMetrics->get().present())
.detail("StartingMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0)
.detail("StartingMerges", shardMetrics->get().present() ? shardMetrics->get().get().merges : 0);*/
state ReadBandwithStatus readBandwithStatus;
try {
loop {
ShardSizeBounds bounds;
if( shardSize->get().present() ) {
auto bytes = shardSize->get().get().bytes;
auto bandwidthStatus = getBandwidthStatus( shardSize->get().get() );
if (shardMetrics->get().present()) {
auto bytes = shardMetrics->get().get().bytes;
auto bandwidthStatus = getBandwidthStatus(shardMetrics->get().get());
auto newReadBandwithStatus = getReadBandwidthStatus(shardMetrics->get().get());
bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES );
bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) );
bounds.permittedError.bytes = bytes * 0.1;
@ -171,15 +189,35 @@ ACTOR Future<Void> trackShardBytes(
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.max.bytesPerKSecond / 4;
} else
} else {
ASSERT( false );
}
// handle read bandkwith status
if (newReadBandwithStatus != readBandwithStatus) {
TraceEvent("ReadBandwithStatusChanged")
.detail("from", readBandwithStatus == ReadBandwithStatusNormal ? "Normal" : "High")
.detail("to", newReadBandwithStatus == ReadBandwithStatusNormal ? "Normal" : "High");
readBandwithStatus = newReadBandwithStatus;
}
if (newReadBandwithStatus == ReadBandwithStatusNormal) {
TEST(true);
bounds.max.bytesReadPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC;
bounds.min.bytesReadPerKSecond = 0;
} else if (newReadBandwithStatus == ReadBandwithStatusHigh) {
TEST(true);
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
bounds.min.bytesReadPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC;
} else {
ASSERT(false);
}
} else {
bounds.max.bytes = -1;
bounds.min.bytes = -1;
bounds.permittedError.bytes = -1;
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.min.bytesPerKSecond = 0;
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
bounds.min.bytesReadPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
}
@ -191,6 +229,7 @@ ACTOR Future<Void> trackShardBytes(
StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) );
/*TraceEvent("ShardSizeUpdate")
<<<<<<< HEAD
.detail("Keys", keys)
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
@ -211,6 +250,25 @@ ACTOR Future<Void> trackShardBytes(
}
shardSize->set( metrics );
=======
.detail("Keys", keys)
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
.detail("BandwithStatus", getBandwidthStatus(metrics))
.detail("BytesLower", bounds.min.bytes)
.detail("BytesUpper", bounds.max.bytes)
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
.detail("ShardMetricsPresent", shardMetrics->get().present())
.detail("OldShardMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0)
.detail("TrackerID", trackerID);*/
if (shardMetrics->get().present() && addToSizeEstimate)
self->dbSizeEstimate->set(self->dbSizeEstimate->get() + metrics.bytes -
shardMetrics->get().get().bytes);
shardMetrics->set(metrics);
>>>>>>> Added metrics for read hot key detection
}
} catch( Error &e ) {
if (e.code() != error_code_actor_cancelled)

View File

@ -117,6 +117,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( KEY_SERVER_SHARD_BYTES, 500000000 );
bool buggifySmallBandwidthSplit = randomize && BUGGIFY;
init( SHARD_MAX_BYTES_PER_KSEC, 1LL*1000000*1000 ); if( buggifySmallBandwidthSplit ) SHARD_MAX_BYTES_PER_KSEC = 10LL*1000*1000;
init( SHARD_MAX_BYTES_READ_PER_KSEC, 1LL*1000000*1000 ); if( buggifySmallBandwidthSplit ) SHARD_MAX_BYTES_PER_KSEC = 10LL*1000*1000;
/* 10*1MB/sec * 1000sec/ksec
Shards with more than this bandwidth will be split immediately.
For a large shard (100MB), splitting it costs ~100MB of work or about 10MB/sec over a 10 sec sampling window.
@ -435,6 +436,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( SPLIT_JITTER_AMOUNT, 0.05 ); if( randomize && BUGGIFY ) SPLIT_JITTER_AMOUNT = 0.2;
init( IOPS_UNITS_PER_SAMPLE, 10000 * 1000 / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 100 );
init( BANDWIDTH_UNITS_PER_SAMPLE, SHARD_MIN_BYTES_PER_KSEC / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 25 );
init( BYTES_READ_UNITS_PER_SAMPLE, 100); // Effectively weight up read on small or non-existing key/values.
//Storage Server
init( STORAGE_LOGGING_DELAY, 5.0 );

View File

@ -113,6 +113,7 @@ public:
int64_t SHARD_MAX_BYTES_PER_KSEC, // Shards with more than this bandwidth will be split immediately
SHARD_MIN_BYTES_PER_KSEC, // Shards with more than this bandwidth will not be merged
SHARD_SPLIT_BYTES_PER_KSEC; // When splitting a shard, it is split into pieces with less than this bandwidth
int64_t SHARD_MAX_BYTES_READ_PER_KSEC;
double STORAGE_METRIC_TIMEOUT;
double METRIC_DELAY;
double ALL_DATA_REMOVED_DELAY;
@ -371,6 +372,7 @@ public:
double SPLIT_JITTER_AMOUNT;
int64_t IOPS_UNITS_PER_SAMPLE;
int64_t BANDWIDTH_UNITS_PER_SAMPLE;
int64_t BYTES_READ_UNITS_PER_SAMPLE;
//Storage Server
double STORAGE_LOGGING_DELAY;

View File

@ -184,12 +184,14 @@ private:
struct StorageServerMetrics {
KeyRangeMap< vector< PromiseStream< StorageMetrics > > > waitMetricsMap;
StorageMetricSample byteSample;
TransientStorageMetricSample iopsSample, bandwidthSample; // FIXME: iops and bandwidth calculations are not effectively tested, since they aren't currently used by data distribution
TransientStorageMetricSample iopsSample, bandwidthSample,
bytesReadSample; // FIXME: iops and bandwidth calculations are not effectively tested, since they aren't
// currently used by data distribution
StorageServerMetrics()
: byteSample( 0 ), iopsSample( SERVER_KNOBS->IOPS_UNITS_PER_SAMPLE ), bandwidthSample( SERVER_KNOBS->BANDWIDTH_UNITS_PER_SAMPLE )
{
}
: byteSample(0), iopsSample(SERVER_KNOBS->IOPS_UNITS_PER_SAMPLE),
bandwidthSample(SERVER_KNOBS->BANDWIDTH_UNITS_PER_SAMPLE),
bytesReadSample(SERVER_KNOBS->BYTES_READ_UNITS_PER_SAMPLE) {}
// Get the current estimated metrics for the given keys
StorageMetrics getMetrics( KeyRangeRef const& keys ) {
@ -197,6 +199,8 @@ struct StorageServerMetrics {
result.bytes = byteSample.getEstimate( keys );
result.bytesPerKSecond = bandwidthSample.getEstimate( keys ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
result.iosPerKSecond = iopsSample.getEstimate( keys ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
result.bytesReadPerKSecond =
bytesReadSample.getEstimate(keys) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
return result;
}
@ -206,6 +210,7 @@ struct StorageServerMetrics {
ASSERT (metrics.bytes == 0); // ShardNotifyMetrics
TEST (metrics.bytesPerKSecond != 0); // ShardNotifyMetrics
TEST (metrics.iosPerKSecond != 0); // ShardNotifyMetrics
TEST(metrics.bytesReadPerKSecond != 0); // ShardNotifyMetrics
double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL;
@ -215,6 +220,9 @@ struct StorageServerMetrics {
notifyMetrics.bytesPerKSecond = bandwidthSample.addAndExpire( key, metrics.bytesPerKSecond, expire ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
if (metrics.iosPerKSecond)
notifyMetrics.iosPerKSecond = iopsSample.addAndExpire( key, metrics.iosPerKSecond, expire ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
if (metrics.bytesReadPerKSecond)
notifyMetrics.bytesReadPerKSecond = bytesReadSample.addAndExpire(key, metrics.bytesReadPerKSecond, expire) *
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
if (!notifyMetrics.allZero()) {
auto& v = waitMetricsMap[key];
for(int i=0; i<v.size(); i++) {
@ -263,6 +271,11 @@ struct StorageServerMetrics {
void poll() {
{ StorageMetrics m; m.bytesPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; bandwidthSample.poll(waitMetricsMap, m); }
{ StorageMetrics m; m.iosPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; iopsSample.poll(waitMetricsMap, m); }
{
StorageMetrics m;
m.bytesReadPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS;
bytesReadSample.poll(waitMetricsMap, m);
}
// bytesSample doesn't need polling because we never call addExpire() on it
}
@ -360,10 +373,12 @@ struct StorageServerMetrics {
rep.free.bytes = sb.free;
rep.free.iosPerKSecond = 10e6;
rep.free.bytesPerKSecond = 100e9;
rep.free.bytesReadPerKSecond = 100e9;
rep.capacity.bytes = sb.total;
rep.capacity.iosPerKSecond = 10e6;
rep.capacity.bytesPerKSecond = 100e9;
rep.capacity.bytesReadPerKSecond = 100e9;
rep.bytesInputRate = bytesInputRate;