Address review comments from Meng
This commit is contained in:
parent
ff8810ab45
commit
76eb1c4376
|
@ -121,10 +121,8 @@ struct DataDistributionTracker {
|
|||
}
|
||||
};
|
||||
|
||||
void restartShardTrackers(
|
||||
DataDistributionTracker* self,
|
||||
KeyRangeRef keys,
|
||||
Optional<ShardMetrics> startingSize = Optional<ShardMetrics>());
|
||||
void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys,
|
||||
Optional<ShardMetrics> startingMetrics = Optional<ShardMetrics>());
|
||||
|
||||
// Gets the permitted size and IO bounds for a shard. A shard that starts at allKeys.begin
|
||||
// (i.e. '') will have a permitted size of 0, since the database can contain no data.
|
||||
|
@ -166,11 +164,8 @@ int64_t getMaxShardSize( double dbSizeEstimate ) {
|
|||
(int64_t)SERVER_KNOBS->MAX_SHARD_BYTES);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> trackShardBytes(
|
||||
DataDistributionTracker* self,
|
||||
KeyRange keys,
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics)
|
||||
{
|
||||
ACTOR Future<Void> trackShardMetrics(DataDistributionTracker* self, KeyRange keys,
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics) {
|
||||
state BandwidthStatus bandwidthStatus = shardMetrics->get().present() ? getBandwidthStatus( shardMetrics->get().get().metrics ) : BandwidthStatusNormal;
|
||||
state double lastLowBandwidthStartTime = shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now();
|
||||
state int shardCount = shardMetrics->get().present() ? shardMetrics->get().get().shardCount : 1;
|
||||
|
@ -178,7 +173,7 @@ ACTOR Future<Void> trackShardBytes(
|
|||
|
||||
wait( delay( 0, TaskPriority::DataDistribution ) );
|
||||
|
||||
/*TraceEvent("TrackShardBytesStarting")
|
||||
/*TraceEvent("trackShardMetricsStarting")
|
||||
.detail("TrackerID", trackerID)
|
||||
.detail("Keys", keys)
|
||||
.detail("TrackedBytesInitiallyPresent", shardMetrics->get().present())
|
||||
|
@ -723,7 +718,7 @@ ACTOR Future<Void> shardTracker(
|
|||
}
|
||||
}
|
||||
|
||||
void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Optional<ShardMetrics> startingSize ) {
|
||||
void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys, Optional<ShardMetrics> startingMetrics) {
|
||||
auto ranges = self->shards.getAffectedRangesAfterInsertion( keys, ShardTrackedData() );
|
||||
for(int i=0; i<ranges.size(); i++) {
|
||||
if( !ranges[i].value.trackShard.isValid() && ranges[i].begin != keys.begin ) {
|
||||
|
@ -733,24 +728,24 @@ void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Opti
|
|||
continue;
|
||||
}
|
||||
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize( new AsyncVar<Optional<ShardMetrics>>() );
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics(new AsyncVar<Optional<ShardMetrics>>());
|
||||
|
||||
// For the case where the new tracker will take over at the boundaries of current shard(s)
|
||||
// we can use the old size if it is available. This will be the case when merging shards.
|
||||
if( startingSize.present() ) {
|
||||
if (startingMetrics.present()) {
|
||||
ASSERT( ranges.size() == 1 );
|
||||
/*TraceEvent("ShardTrackerSizePreset", self->distributorId)
|
||||
.detail("Keys", keys)
|
||||
.detail("Size", startingSize.get().metrics.bytes)
|
||||
.detail("Merges", startingSize.get().merges);*/
|
||||
.detail("Keys", keys)
|
||||
.detail("Size", startingMetrics.get().metrics.bytes)
|
||||
.detail("Merges", startingMetrics.get().merges);*/
|
||||
TEST( true ); // shardTracker started with trackedBytes already set
|
||||
shardSize->set( startingSize );
|
||||
shardMetrics->set(startingMetrics);
|
||||
}
|
||||
|
||||
ShardTrackedData data;
|
||||
data.stats = shardSize;
|
||||
data.trackShard = shardTracker( self, ranges[i], shardSize );
|
||||
data.trackBytes = trackShardBytes( self, ranges[i], shardSize );
|
||||
data.stats = shardMetrics;
|
||||
data.trackShard = shardTracker(self, ranges[i], shardMetrics);
|
||||
data.trackBytes = trackShardMetrics(self, ranges[i], shardMetrics);
|
||||
self->shards.insert( ranges[i], data );
|
||||
}
|
||||
}
|
||||
|
|
|
@ -410,7 +410,7 @@ struct StorageServerMetrics {
|
|||
|
||||
Future<Void> waitMetrics(WaitMetricsRequest req, Future<Void> delay);
|
||||
|
||||
// Given a read hot shard, this function will divid the shard into chunks and find those chunks whose
|
||||
// Given a read hot shard, this function will divide the shard into chunks and find those chunks whose
|
||||
// readBytes/sizeBytes exceeds the `readDensityRatio`. Please make sure to run unit tests
|
||||
// `StorageMetricsSampleTests.txt` after change made.
|
||||
std::vector<KeyRangeRef> getReadHotRanges(KeyRangeRef shard, double readDensityRatio, int64_t baseChunkSize,
|
||||
|
@ -441,12 +441,11 @@ struct StorageServerMetrics {
|
|||
chunkSize);
|
||||
continue;
|
||||
}
|
||||
if (bytesReadSample.getEstimate(KeyRangeRef(beginKey, *endKey)) / chunkSize > readDensityRatio) {
|
||||
if (bytesReadSample.getEstimate(KeyRangeRef(beginKey, *endKey)) > (readDensityRatio * chunkSize)) {
|
||||
auto range = KeyRangeRef(beginKey, *endKey);
|
||||
if (!toReturn.empty() && toReturn.back().end == range.begin) {
|
||||
auto updatedTail =
|
||||
KeyRangeRef(toReturn.back().begin,
|
||||
*endKey); // in case two consecutive chunks both are over the ratio, merge them.
|
||||
// in case two consecutive chunks both are over the ratio, merge them.
|
||||
auto updatedTail = KeyRangeRef(toReturn.back().begin, *endKey);
|
||||
toReturn.pop_back();
|
||||
toReturn.push_back(updatedTail);
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue