Added a trace event to warn if a shard is merged before enough time has elapses from becoming low bandwidth
This commit is contained in:
parent
c2608f0af9
commit
9b80498180
|
@ -756,10 +756,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
int64_t bestLoadBytes = 0;
|
||||
Optional<Reference<IDataDistributionTeam>> bestOption;
|
||||
std::vector<Reference<IDataDistributionTeam>> randomTeams;
|
||||
std::set<UID> completeSources;
|
||||
for( int i = 0; i < req.completeSources.size(); i++ ) {
|
||||
completeSources.insert( req.completeSources[i] );
|
||||
}
|
||||
const std::set<UID> completeSources(req.completeSources.begin(), req.completeSources.end());
|
||||
|
||||
if( !req.wantsNewServers ) {
|
||||
for( int i = 0; i < req.completeSources.size(); i++ ) {
|
||||
if( !self->server_info.count( req.completeSources[i] ) ) {
|
||||
|
|
|
@ -35,13 +35,13 @@ enum BandwidthStatus {
|
|||
|
||||
struct ShardMetrics {
|
||||
StorageMetrics metrics;
|
||||
double lastLowBandwidthTime;
|
||||
double lastLowBandwidthStartTime;
|
||||
|
||||
bool operator == ( ShardMetrics const& rhs ) const {
|
||||
return metrics == rhs.metrics && lastLowBandwidthTime == rhs.lastLowBandwidthTime;
|
||||
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime;
|
||||
}
|
||||
|
||||
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthTime) : metrics(metrics), lastLowBandwidthTime(lastLowBandwidthTime) {}
|
||||
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime) : metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime) {}
|
||||
};
|
||||
|
||||
BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
|
||||
|
@ -153,7 +153,7 @@ ACTOR Future<Void> trackShardBytes(
|
|||
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize)
|
||||
{
|
||||
state BandwidthStatus bandwidthStatus = shardSize->get().present() ? getBandwidthStatus( shardSize->get().get().metrics ) : BandwidthStatusNormal;
|
||||
state double lastLowBandwidthTime = shardSize->get().present() ? shardSize->get().get().lastLowBandwidthTime : now();
|
||||
state double lastLowBandwidthStartTime = shardSize->get().present() ? shardSize->get().get().lastLowBandwidthStartTime : now();
|
||||
wait( delay( 0, TaskPriority::DataDistribution ) );
|
||||
|
||||
/*TraceEvent("TrackShardBytesStarting")
|
||||
|
@ -203,7 +203,7 @@ ACTOR Future<Void> trackShardBytes(
|
|||
StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) );
|
||||
BandwidthStatus newBandwidthStatus = getBandwidthStatus( metrics );
|
||||
if(newBandwidthStatus == BandwidthStatusLow && bandwidthStatus != BandwidthStatusLow) {
|
||||
lastLowBandwidthTime = now();
|
||||
lastLowBandwidthStartTime = now();
|
||||
}
|
||||
bandwidthStatus = newBandwidthStatus;
|
||||
|
||||
|
@ -227,7 +227,7 @@ ACTOR Future<Void> trackShardBytes(
|
|||
}
|
||||
}
|
||||
|
||||
shardSize->set( ShardMetrics(metrics, lastLowBandwidthTime) );
|
||||
shardSize->set( ShardMetrics(metrics, lastLowBandwidthStartTime) );
|
||||
}
|
||||
} catch( Error &e ) {
|
||||
if (e.code() != error_code_actor_cancelled)
|
||||
|
@ -419,7 +419,11 @@ Future<Void> shardMerger(
|
|||
bool forwardComplete = false;
|
||||
KeyRangeRef merged;
|
||||
StorageMetrics endingStats = shardSize->get().get().metrics;
|
||||
double lastLowBandwidthTime = shardSize->get().get().lastLowBandwidthTime;
|
||||
double lastLowBandwidthStartTime = shardSize->get().get().lastLowBandwidthStartTime;
|
||||
if(SERVER_KNOBS->DD_MERGE_COALESCE_DELAY > SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY && now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY) {
|
||||
TraceEvent( g_network->isSimulated() ? SevError : SevWarnAlways, "ShardMergeTooSoon", self->distributorId).detail("Keys", keys);
|
||||
}
|
||||
|
||||
int64_t systemBytes = keys.begin >= systemKeys.begin ? shardSize->get().get().metrics.bytes : 0;
|
||||
|
||||
loop {
|
||||
|
@ -458,7 +462,7 @@ Future<Void> shardMerger(
|
|||
|
||||
merged = KeyRangeRef( prevIter->range().begin, nextIter->range().end );
|
||||
endingStats += newMetrics.get().metrics;
|
||||
lastLowBandwidthTime = newMetrics.get().lastLowBandwidthTime;
|
||||
lastLowBandwidthStartTime = newMetrics.get().lastLowBandwidthStartTime;
|
||||
if((forwardComplete ? prevIter->range().begin : nextIter->range().begin) >= systemKeys.begin) {
|
||||
systemBytes += newMetrics.get().metrics.bytes;
|
||||
}
|
||||
|
@ -467,7 +471,7 @@ Future<Void> shardMerger(
|
|||
auto shardBounds = getShardSizeBounds( merged, maxShardSize );
|
||||
if( endingStats.bytes >= shardBounds.min.bytes ||
|
||||
getBandwidthStatus( endingStats ) != BandwidthStatusLow ||
|
||||
now() - lastLowBandwidthTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY ||
|
||||
now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY ||
|
||||
shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT ) {
|
||||
// The merged range is larger than the min bounds so we cannot continue merging in this direction.
|
||||
// This means that:
|
||||
|
@ -503,7 +507,7 @@ Future<Void> shardMerger(
|
|||
if(mergeRange.begin < systemKeys.begin) {
|
||||
self->systemSizeEstimate -= systemBytes;
|
||||
}
|
||||
restartShardTrackers( self, mergeRange, ShardMetrics(endingStats, lastLowBandwidthTime) );
|
||||
restartShardTrackers( self, mergeRange, ShardMetrics(endingStats, lastLowBandwidthStartTime) );
|
||||
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
|
||||
self->output.send( RelocateShard( mergeRange, SERVER_KNOBS->PRIORITY_MERGE_SHARD ) );
|
||||
|
||||
|
|
|
@ -181,8 +181,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( DATA_DISTRIBUTION_LOGGING_INTERVAL, 5.0 );
|
||||
init( DD_ENABLED_CHECK_DELAY, 1.0 );
|
||||
init( DD_STALL_CHECK_DELAY, 0.4 ); //Must be larger than 2*MAX_BUGGIFIED_DELAY
|
||||
init( DD_LOW_BANDWIDTH_DELAY, isSimulated ? 90.0 : 280.0 ); if( randomize && BUGGIFY ) DD_LOW_BANDWIDTH_DELAY = 0;
|
||||
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 120.0 : 300.0 ); if( randomize && BUGGIFY ) { DD_LOW_BANDWIDTH_DELAY = 0; DD_MERGE_COALESCE_DELAY = 0.001; }
|
||||
init( DD_LOW_BANDWIDTH_DELAY, isSimulated ? 100.0 : 280.0 ); if( randomize && BUGGIFY ) DD_LOW_BANDWIDTH_DELAY = 0;
|
||||
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 120.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
|
||||
init( STORAGE_METRICS_POLLING_DELAY, 2.0 ); if( randomize && BUGGIFY ) STORAGE_METRICS_POLLING_DELAY = 15.0;
|
||||
init( STORAGE_METRICS_RANDOM_DELAY, 0.2 );
|
||||
init( FREE_SPACE_RATIO_CUTOFF, 0.1 );
|
||||
|
|
Loading…
Reference in New Issue