diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 99a673b15c..2505bf5a31 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -284,35 +284,40 @@ struct StorageMetrics { int64_t bytes; // total storage int64_t bytesPerKSecond; // network bandwidth (average over 10s) int64_t iosPerKSecond; + int64_t bytesReadPerKSecond; static const int64_t infinity = 1LL<<60; - StorageMetrics() : bytes(0), bytesPerKSecond(0), iosPerKSecond(0) {} + StorageMetrics() : bytes(0), bytesPerKSecond(0), iosPerKSecond(0), bytesReadPerKSecond(0) {} bool allLessOrEqual( const StorageMetrics& rhs ) const { - return bytes <= rhs.bytes && bytesPerKSecond <= rhs.bytesPerKSecond && iosPerKSecond <= rhs.iosPerKSecond; + return bytes <= rhs.bytes && bytesPerKSecond <= rhs.bytesPerKSecond && iosPerKSecond <= rhs.iosPerKSecond && + bytesReadPerKSecond <= rhs.bytesReadPerKSecond; } void operator += ( const StorageMetrics& rhs ) { bytes += rhs.bytes; bytesPerKSecond += rhs.bytesPerKSecond; iosPerKSecond += rhs.iosPerKSecond; + bytesReadPerKSecond += rhs.bytesReadPerKSecond; } void operator -= ( const StorageMetrics& rhs ) { bytes -= rhs.bytes; bytesPerKSecond -= rhs.bytesPerKSecond; iosPerKSecond -= rhs.iosPerKSecond; + bytesReadPerKSecond -= rhs.bytesReadPerKSecond; } template void operator *= ( F f ) { bytes *= f; bytesPerKSecond *= f; iosPerKSecond *= f; + bytesReadPerKSecond *= f; } - bool allZero() const { return !bytes && !bytesPerKSecond && !iosPerKSecond; } + bool allZero() const { return !bytes && !bytesPerKSecond && !iosPerKSecond && !bytesReadPerKSecond; } template void serialize( Ar& ar ) { - serializer(ar, bytes, bytesPerKSecond, iosPerKSecond); + serializer(ar, bytes, bytesPerKSecond, iosPerKSecond, bytesReadPerKSecond); } void negate() { operator*=(-1.0); } @@ -322,11 +327,13 @@ struct StorageMetrics { template StorageMetrics operator * ( F f ) const { StorageMetrics x(*this); x*=f; return x; } bool operator == ( StorageMetrics const& rhs ) const { - return bytes == rhs.bytes && bytesPerKSecond == rhs.bytesPerKSecond && iosPerKSecond == rhs.iosPerKSecond; + return bytes == rhs.bytes && bytesPerKSecond == rhs.bytesPerKSecond && iosPerKSecond == rhs.iosPerKSecond && + bytesReadPerKSecond == rhs.bytesReadPerKSecond; } std::string toString() const { - return format("Bytes: %lld, BPerKSec: %lld, iosPerKSec: %lld", bytes, bytesPerKSecond, iosPerKSecond); + return format("Bytes: %lld, BPerKSec: %lld, iosPerKSec: %lld, BReadPerKSec: %lld", bytes, bytesPerKSecond, + iosPerKSecond, bytesReadPerKSecond); } }; diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index b690cd639c..474f2e8dc1 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -33,6 +33,8 @@ enum BandwidthStatus { BandwidthStatusHigh }; +enum ReadBandwithStatus { ReadBandwithStatusNormal, ReadBandwithStatusHigh }; + BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) { if( metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC ) return BandwidthStatusHigh; @@ -42,6 +44,13 @@ BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) { return BandwidthStatusNormal; } +ReadBandwithStatus getReadBandwidthStatus(StorageMetrics const& metrics) { + if (metrics.bytesReadPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC) + return ReadBandwithStatusHigh; + else + return ReadBandwithStatusNormal; +} + ACTOR Future updateMaxShardSize( Reference> dbSizeEstimate, Reference>> maxShardSize ) { state int64_t lastDbSize = 0; state int64_t granularity = g_network->isSimulated() ? @@ -136,26 +145,35 @@ int64_t getMaxShardSize( double dbSizeEstimate ) { (int64_t)SERVER_KNOBS->MAX_SHARD_BYTES); } +<<<<<<< HEAD ACTOR Future trackShardBytes( DataDistributionTracker* self, KeyRange keys, Reference>> shardSize) { +======= +ACTOR Future trackShardBytes(DataDistributionTracker* self, KeyRange keys, + Reference>> shardMetrics, + bool addToSizeEstimate = true) { +>>>>>>> Added metrics for read hot key detection wait( delay( 0, TaskPriority::DataDistribution ) ); /*TraceEvent("TrackShardBytesStarting") - .detail("TrackerID", trackerID) - .detail("Keys", keys) - .detail("TrackedBytesInitiallyPresent", shardSize->get().present()) - .detail("StartingSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0) - .detail("StartingMerges", shardSize->get().present() ? shardSize->get().get().merges : 0);*/ + .detail("TrackerID", trackerID) + .detail("Keys", keys) + .detail("TrackedBytesInitiallyPresent", shardMetrics->get().present()) + .detail("StartingMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0) + .detail("StartingMerges", shardMetrics->get().present() ? shardMetrics->get().get().merges : 0);*/ + state ReadBandwithStatus readBandwithStatus; try { loop { ShardSizeBounds bounds; - if( shardSize->get().present() ) { - auto bytes = shardSize->get().get().bytes; - auto bandwidthStatus = getBandwidthStatus( shardSize->get().get() ); + if (shardMetrics->get().present()) { + auto bytes = shardMetrics->get().get().bytes; + auto bandwidthStatus = getBandwidthStatus(shardMetrics->get().get()); + auto newReadBandwithStatus = getReadBandwidthStatus(shardMetrics->get().get()); + bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES ); bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) ); bounds.permittedError.bytes = bytes * 0.1; @@ -171,15 +189,35 @@ ACTOR Future trackShardBytes( bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC; bounds.min.bytesPerKSecond = 0; bounds.permittedError.bytesPerKSecond = bounds.max.bytesPerKSecond / 4; - } else + } else { ASSERT( false ); - + } + // handle read bandkwith status + if (newReadBandwithStatus != readBandwithStatus) { + TraceEvent("ReadBandwithStatusChanged") + .detail("from", readBandwithStatus == ReadBandwithStatusNormal ? "Normal" : "High") + .detail("to", newReadBandwithStatus == ReadBandwithStatusNormal ? "Normal" : "High"); + readBandwithStatus = newReadBandwithStatus; + } + if (newReadBandwithStatus == ReadBandwithStatusNormal) { + TEST(true); + bounds.max.bytesReadPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC; + bounds.min.bytesReadPerKSecond = 0; + } else if (newReadBandwithStatus == ReadBandwithStatusHigh) { + TEST(true); + bounds.max.bytesReadPerKSecond = bounds.max.infinity; + bounds.min.bytesReadPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC; + } else { + ASSERT(false); + } } else { bounds.max.bytes = -1; bounds.min.bytes = -1; bounds.permittedError.bytes = -1; bounds.max.bytesPerKSecond = bounds.max.infinity; bounds.min.bytesPerKSecond = 0; + bounds.max.bytesReadPerKSecond = bounds.max.infinity; + bounds.min.bytesReadPerKSecond = 0; bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity; } @@ -191,6 +229,7 @@ ACTOR Future trackShardBytes( StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) ); /*TraceEvent("ShardSizeUpdate") +<<<<<<< HEAD .detail("Keys", keys) .detail("UpdatedSize", metrics.metrics.bytes) .detail("Bandwidth", metrics.metrics.bytesPerKSecond) @@ -211,6 +250,25 @@ ACTOR Future trackShardBytes( } shardSize->set( metrics ); +======= + .detail("Keys", keys) + .detail("UpdatedSize", metrics.metrics.bytes) + .detail("Bandwidth", metrics.metrics.bytesPerKSecond) + .detail("BandwithStatus", getBandwidthStatus(metrics)) + .detail("BytesLower", bounds.min.bytes) + .detail("BytesUpper", bounds.max.bytes) + .detail("BandwidthLower", bounds.min.bytesPerKSecond) + .detail("BandwidthUpper", bounds.max.bytesPerKSecond) + .detail("ShardMetricsPresent", shardMetrics->get().present()) + .detail("OldShardMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0) + .detail("TrackerID", trackerID);*/ + + if (shardMetrics->get().present() && addToSizeEstimate) + self->dbSizeEstimate->set(self->dbSizeEstimate->get() + metrics.bytes - + shardMetrics->get().get().bytes); + + shardMetrics->set(metrics); +>>>>>>> Added metrics for read hot key detection } } catch( Error &e ) { if (e.code() != error_code_actor_cancelled) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 91e426ba03..aa28fdfc06 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -117,6 +117,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( KEY_SERVER_SHARD_BYTES, 500000000 ); bool buggifySmallBandwidthSplit = randomize && BUGGIFY; init( SHARD_MAX_BYTES_PER_KSEC, 1LL*1000000*1000 ); if( buggifySmallBandwidthSplit ) SHARD_MAX_BYTES_PER_KSEC = 10LL*1000*1000; + init( SHARD_MAX_BYTES_READ_PER_KSEC, 1LL*1000000*1000 ); if( buggifySmallBandwidthSplit ) SHARD_MAX_BYTES_PER_KSEC = 10LL*1000*1000; /* 10*1MB/sec * 1000sec/ksec Shards with more than this bandwidth will be split immediately. For a large shard (100MB), splitting it costs ~100MB of work or about 10MB/sec over a 10 sec sampling window. @@ -435,6 +436,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( SPLIT_JITTER_AMOUNT, 0.05 ); if( randomize && BUGGIFY ) SPLIT_JITTER_AMOUNT = 0.2; init( IOPS_UNITS_PER_SAMPLE, 10000 * 1000 / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 100 ); init( BANDWIDTH_UNITS_PER_SAMPLE, SHARD_MIN_BYTES_PER_KSEC / STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS / 25 ); + init( BYTES_READ_UNITS_PER_SAMPLE, 100); // Effectively weight up read on small or non-existing key/values. //Storage Server init( STORAGE_LOGGING_DELAY, 5.0 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 6264299b6f..b8daf6eb7a 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -113,6 +113,7 @@ public: int64_t SHARD_MAX_BYTES_PER_KSEC, // Shards with more than this bandwidth will be split immediately SHARD_MIN_BYTES_PER_KSEC, // Shards with more than this bandwidth will not be merged SHARD_SPLIT_BYTES_PER_KSEC; // When splitting a shard, it is split into pieces with less than this bandwidth + int64_t SHARD_MAX_BYTES_READ_PER_KSEC; double STORAGE_METRIC_TIMEOUT; double METRIC_DELAY; double ALL_DATA_REMOVED_DELAY; @@ -362,15 +363,16 @@ public: double INITIAL_DURABILITY_LAG_MULTIPLIER; double DURABILITY_LAG_REDUCTION_RATE; double DURABILITY_LAG_INCREASE_RATE; - + double STORAGE_SERVER_LIST_FETCH_TIMEOUT; - + //Storage Metrics double STORAGE_METRICS_AVERAGE_INTERVAL; double STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; double SPLIT_JITTER_AMOUNT; int64_t IOPS_UNITS_PER_SAMPLE; int64_t BANDWIDTH_UNITS_PER_SAMPLE; + int64_t BYTES_READ_UNITS_PER_SAMPLE; //Storage Server double STORAGE_LOGGING_DELAY; diff --git a/fdbserver/StorageMetrics.actor.h b/fdbserver/StorageMetrics.actor.h index 327f3ed431..010c9f62ba 100644 --- a/fdbserver/StorageMetrics.actor.h +++ b/fdbserver/StorageMetrics.actor.h @@ -184,12 +184,14 @@ private: struct StorageServerMetrics { KeyRangeMap< vector< PromiseStream< StorageMetrics > > > waitMetricsMap; StorageMetricSample byteSample; - TransientStorageMetricSample iopsSample, bandwidthSample; // FIXME: iops and bandwidth calculations are not effectively tested, since they aren't currently used by data distribution + TransientStorageMetricSample iopsSample, bandwidthSample, + bytesReadSample; // FIXME: iops and bandwidth calculations are not effectively tested, since they aren't + // currently used by data distribution StorageServerMetrics() - : byteSample( 0 ), iopsSample( SERVER_KNOBS->IOPS_UNITS_PER_SAMPLE ), bandwidthSample( SERVER_KNOBS->BANDWIDTH_UNITS_PER_SAMPLE ) - { - } + : byteSample(0), iopsSample(SERVER_KNOBS->IOPS_UNITS_PER_SAMPLE), + bandwidthSample(SERVER_KNOBS->BANDWIDTH_UNITS_PER_SAMPLE), + bytesReadSample(SERVER_KNOBS->BYTES_READ_UNITS_PER_SAMPLE) {} // Get the current estimated metrics for the given keys StorageMetrics getMetrics( KeyRangeRef const& keys ) { @@ -197,6 +199,8 @@ struct StorageServerMetrics { result.bytes = byteSample.getEstimate( keys ); result.bytesPerKSecond = bandwidthSample.getEstimate( keys ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; result.iosPerKSecond = iopsSample.getEstimate( keys ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; + result.bytesReadPerKSecond = + bytesReadSample.getEstimate(keys) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; return result; } @@ -206,6 +210,7 @@ struct StorageServerMetrics { ASSERT (metrics.bytes == 0); // ShardNotifyMetrics TEST (metrics.bytesPerKSecond != 0); // ShardNotifyMetrics TEST (metrics.iosPerKSecond != 0); // ShardNotifyMetrics + TEST(metrics.bytesReadPerKSecond != 0); // ShardNotifyMetrics double expire = now() + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL; @@ -215,6 +220,9 @@ struct StorageServerMetrics { notifyMetrics.bytesPerKSecond = bandwidthSample.addAndExpire( key, metrics.bytesPerKSecond, expire ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; if (metrics.iosPerKSecond) notifyMetrics.iosPerKSecond = iopsSample.addAndExpire( key, metrics.iosPerKSecond, expire ) * SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; + if (metrics.bytesReadPerKSecond) + notifyMetrics.bytesReadPerKSecond = bytesReadSample.addAndExpire(key, metrics.bytesReadPerKSecond, expire) * + SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; if (!notifyMetrics.allZero()) { auto& v = waitMetricsMap[key]; for(int i=0; iSTORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; bandwidthSample.poll(waitMetricsMap, m); } { StorageMetrics m; m.iosPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; iopsSample.poll(waitMetricsMap, m); } + { + StorageMetrics m; + m.bytesReadPerKSecond = SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS; + bytesReadSample.poll(waitMetricsMap, m); + } // bytesSample doesn't need polling because we never call addExpire() on it } @@ -360,10 +373,12 @@ struct StorageServerMetrics { rep.free.bytes = sb.free; rep.free.iosPerKSecond = 10e6; rep.free.bytesPerKSecond = 100e9; + rep.free.bytesReadPerKSecond = 100e9; rep.capacity.bytes = sb.total; rep.capacity.iosPerKSecond = 10e6; rep.capacity.bytesPerKSecond = 100e9; + rep.capacity.bytesReadPerKSecond = 100e9; rep.bytesInputRate = bytesInputRate;