fix: trackShardBytes was called with the incorrect range, resulting in incorrect shard sizes

reduced the size of shard tracker actors by removing unnecessary state variable. Because we have a large number of these actors these extra state variables add up to a lot of memory
This commit is contained in:
Evan Tschannen 2018-11-02 13:03:01 -07:00
parent ad98acf795
commit e68c07ae35
1 changed files with 62 additions and 77 deletions

View File

@ -138,11 +138,8 @@ ACTOR Future<Void> trackShardBytes(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
UID trackerID,
bool addToSizeEstimate = true)
{
state Transaction tr(self->cx);
Void _ = wait( delay( 0, TaskDataDistribution ) );
/*TraceEvent("TrackShardBytesStarting")
@ -154,65 +151,61 @@ ACTOR Future<Void> trackShardBytes(
try {
loop {
try {
state ShardSizeBounds bounds;
if( shardSize->get().present() ) {
auto bytes = shardSize->get().get().bytes;
auto bandwidthStatus = getBandwidthStatus( shardSize->get().get() );
bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES );
bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) );
bounds.permittedError.bytes = bytes * 0.1;
if( bandwidthStatus == BandwidthStatusNormal ) { // Not high or low
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
} else if( bandwidthStatus == BandwidthStatusHigh ) { // > 10MB/sec for 100MB shard, proportionally lower for smaller shard, > 200KB/sec no matter what
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
} else if( bandwidthStatus == BandwidthStatusLow ) { // < 10KB/sec
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.max.bytesPerKSecond / 4;
} else
ASSERT( false );
} else {
bounds.max.bytes = -1;
bounds.min.bytes = -1;
bounds.permittedError.bytes = -1;
ShardSizeBounds bounds;
if( shardSize->get().present() ) {
auto bytes = shardSize->get().get().bytes;
auto bandwidthStatus = getBandwidthStatus( shardSize->get().get() );
bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES );
bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) );
bounds.permittedError.bytes = bytes * 0.1;
if( bandwidthStatus == BandwidthStatusNormal ) { // Not high or low
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
} else if( bandwidthStatus == BandwidthStatusHigh ) { // > 10MB/sec for 100MB shard, proportionally lower for smaller shard, > 200KB/sec no matter what
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
} else if( bandwidthStatus == BandwidthStatusLow ) { // < 10KB/sec
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
}
bounds.permittedError.bytesPerKSecond = bounds.max.bytesPerKSecond / 4;
} else
ASSERT( false );
bounds.max.iosPerKSecond = bounds.max.infinity;
bounds.min.iosPerKSecond = 0;
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;
StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) );
/*TraceEvent("ShardSizeUpdate")
.detail("Keys", printable(keys))
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
.detail("BandwithStatus", getBandwidthStatus(metrics))
.detail("BytesLower", bounds.min.bytes)
.detail("BytesUpper", bounds.max.bytes)
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
.detail("ShardSizePresent", shardSize->get().present())
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0 )
.detail("TrackerID", trackerID);*/
if( shardSize->get().present() && addToSizeEstimate )
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.bytes - shardSize->get().get().bytes );
shardSize->set( metrics );
} catch( Error &e ) {
//TraceEvent("ShardSizeUpdateError").error(e, true).detail("Begin", printable(keys.begin)).detail("End", printable(keys.end)).detail("TrackerID", trackerID);
Void _ = wait( tr.onError(e) );
} else {
bounds.max.bytes = -1;
bounds.min.bytes = -1;
bounds.permittedError.bytes = -1;
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.min.bytesPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
}
bounds.max.iosPerKSecond = bounds.max.infinity;
bounds.min.iosPerKSecond = 0;
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;
Transaction tr(self->cx);
StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) );
/*TraceEvent("ShardSizeUpdate")
.detail("Keys", printable(keys))
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
.detail("BandwithStatus", getBandwidthStatus(metrics))
.detail("BytesLower", bounds.min.bytes)
.detail("BytesUpper", bounds.max.bytes)
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
.detail("ShardSizePresent", shardSize->get().present())
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0 )
.detail("TrackerID", trackerID);*/
if( shardSize->get().present() && addToSizeEstimate )
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.bytes - shardSize->get().get().bytes );
shardSize->set( metrics );
}
} catch( Error &e ) {
if (e.code() != error_code_actor_cancelled)
@ -283,12 +276,12 @@ ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRangeRef keys,
}
struct HasBeenTrueFor : NonCopyable {
explicit HasBeenTrueFor( double seconds, bool value ) : enough( seconds ), trigger( value ? Void() : Future<Void>() ) {}
explicit HasBeenTrueFor( bool value ) : trigger( value ? Void() : Future<Void>() ) {}
Future<Void> set() {
if( !trigger.isValid() ) {
cleared = Promise<Void>();
trigger = delayJittered( enough, TaskDataDistribution - 1 ) || cleared.getFuture();
trigger = delayJittered( SERVER_KNOBS->DD_MERGE_COALESCE_DELAY, TaskDataDistribution - 1 ) || cleared.getFuture();
}
return trigger;
}
@ -308,12 +301,10 @@ struct HasBeenTrueFor : NonCopyable {
private:
Future<Void> trigger;
Promise<Void> cleared;
const double enough;
};
ACTOR Future<Void> shardSplitter(
DataDistributionTracker* self,
UID trackerId,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
ShardSizeBounds shardBounds )
@ -340,7 +331,6 @@ ACTOR Future<Void> shardSplitter(
TraceEvent("RelocateShardStartSplitx100", self->masterId)
.detail("Begin", printable(keys.begin))
.detail("End", printable(keys.end))
.detail("TrackerID", trackerId)
.detail("MaxBytes", shardBounds.max.bytes)
.detail("MetricsBytes", metrics.bytes)
.detail("Bandwidth", bandwidthStatus == BandwidthStatusHigh ? "High" : bandwidthStatus == BandwidthStatusNormal ? "Normal" : "Low")
@ -378,7 +368,6 @@ ACTOR Future<Void> shardSplitter(
Future<Void> shardMerger(
DataDistributionTracker* self,
UID trackerId,
KeyRange const& keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize )
{
@ -464,8 +453,7 @@ Future<Void> shardMerger(
.detail("OldKeys", printable(keys))
.detail("NewKeys", printable(mergeRange))
.detail("EndingSize", endingStats.bytes)
.detail("BatchedMerges", shardsMerged)
.detail("TrackerID", trackerId);
.detail("BatchedMerges", shardsMerged);
restartShardTrackers( self, mergeRange, endingStats );
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
@ -479,8 +467,7 @@ ACTOR Future<Void> shardEvaluator(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
HasBeenTrueFor *wantsToMerge,
UID trackerID)
HasBeenTrueFor *wantsToMerge)
{
Future<Void> onChange = shardSize->onChange() || yieldedFuture(self->maxShardSize->onChange());
@ -515,10 +502,10 @@ ACTOR Future<Void> shardEvaluator(
.detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough());*/
if(!self->anyZeroHealthyTeams->get() && wantsToMerge->hasBeenTrueForLongEnough()) {
onChange = onChange || shardMerger( self, trackerID, keys, shardSize );
onChange = onChange || shardMerger( self, keys, shardSize );
}
if( shouldSplit ) {
onChange = onChange || shardSplitter( self, trackerID, keys, shardSize, shardBounds );
onChange = onChange || shardSplitter( self, keys, shardSize, shardBounds );
}
Void _ = wait( onChange );
@ -528,11 +515,10 @@ ACTOR Future<Void> shardEvaluator(
ACTOR Future<Void> shardTracker(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
UID trackerID )
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize)
{
// Survives multiple calls to shardEvaluator and keeps merges from happening too quickly.
state HasBeenTrueFor wantsToMerge( SERVER_KNOBS->DD_MERGE_COALESCE_DELAY, shardSize->get().present() );
state HasBeenTrueFor wantsToMerge( shardSize->get().present() );
Void _ = wait( yieldedFuture(self->readyToStart.getFuture()) );
@ -556,7 +542,7 @@ ACTOR Future<Void> shardTracker(
try {
loop {
// Use the current known size to check for (and start) splits and merges.
Void _ = wait( shardEvaluator( self, keys, shardSize, &wantsToMerge, trackerID ) );
Void _ = wait( shardEvaluator( self, keys, shardSize, &wantsToMerge ) );
// We could have a lot of actors being released from the previous wait at the same time. Immediately calling
// delay(0) mitigates the resulting SlowTask
@ -593,11 +579,10 @@ void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Opti
shardSize->set( startingSize );
}
UID trackerID = g_random->randomUniqueID();
ShardTrackedData data;
data.stats = shardSize;
data.trackShard = shardTracker( self, ranges[i], shardSize, trackerID );
data.trackBytes = trackShardBytes( self, keys, shardSize, trackerID );
data.trackShard = shardTracker( self, ranges[i], shardSize );
data.trackBytes = trackShardBytes( self, ranges[i], shardSize );
self->shards.insert( ranges[i], data );
}
}