Merge pull request #3834 from sfc-gh-tclinkenbeard/avoid-ddtracker-slow-task

Avoid slow task in ~DataDistributionTracker
This commit is contained in:
Meng Xu 2020-10-08 11:00:19 -07:00 committed by GitHub
commit 011754fd7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 83 additions and 70 deletions

View File

@ -4,6 +4,10 @@
Release Notes
#############
6.2.27
======
* For clusters with a large number of shards, avoid slow tasks in the data distributor by adding yields to the shard map destruction. `(PR #3834) <https://github.com/apple/foundationdb/pull/3834>`_
6.2.26
======

View File

@ -154,6 +154,8 @@ public:
void insert( const Range& keys, const Val& value );
Future<Void> clearAsync() { return map.clearAsync(); }
protected:
Map<Key,Val,pair_type,Metric> map;
const MetricFunc mf;

View File

@ -4297,6 +4297,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
zeroHealthyTeams.push_back(Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) ));
int storageTeamSize = configuration.storageTeamSize;
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
state KeyRangeMap<ShardTrackedData> shards;
vector<Future<Void>> actors;
if (configuration.usableRegions > 1) {
tcis.push_back(TeamCollectionInterface());
@ -4310,7 +4314,11 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
}
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) );
actors.push_back(
reportErrorsExcept(dataDistributionTracker(initData, cx, output, shardsAffectedByTeamFailure,
getShardMetrics, getAverageShardBytes.getFuture(),
readyToStart, anyZeroHealthyTeams, self->ddId, &shards),
"DDTracker", self->ddId, &normalDDQueueErrors()));
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, configuration.storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
vector<DDTeamCollection*> teamCollectionsPtrs;
@ -4331,8 +4339,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
}
catch( Error &e ) {
state Error err = e;
if( e.code() != error_code_movekeys_conflict )
throw err;
wait(shards.clearAsync());
if (err.code() != error_code_movekeys_conflict) throw err;
bool ddEnabled = wait( isDataDistributionEnabled(cx) );
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err);
if( ddEnabled )

View File

@ -183,31 +183,40 @@ struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
Optional<Key> initHealthyZoneValue;
};
Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> const& initData,
Database const& cx,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
FutureStream<Promise<int64_t>> const& getAverageShardBytes,
Promise<Void> const& readyToStart,
Reference<AsyncVar<bool>> const& zeroHealthyTeams,
UID const& distributorId);
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
Future<Void> dataDistributionQueue(
Database const& cx,
PromiseStream<RelocateShard> const& output,
FutureStream<RelocateShard> const& input,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
Reference<AsyncVar<bool>> const& processingUnhealthy,
vector<TeamCollectionInterface> const& teamCollection,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
MoveKeysLock const& lock,
PromiseStream<Promise<int64_t>> const& getAverageShardBytes,
UID const& distributorId,
int const& teamSize,
int const& singleRegionTeamSize,
double* const& lastLimited);
bool operator==(ShardMetrics const& rhs) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime &&
shardCount == rhs.shardCount;
}
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime, int shardCount)
: metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime), shardCount(shardCount) {}
};
struct ShardTrackedData {
Future<Void> trackShard;
Future<Void> trackBytes;
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData, Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams,
UID distributorId, KeyRangeMap<ShardTrackedData>* shards);
ACTOR Future<Void> dataDistributionQueue(
Database cx, PromiseStream<RelocateShard> output, FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics, Reference<AsyncVar<bool>> processingUnhealthy,
vector<TeamCollectionInterface> teamCollection, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock, PromiseStream<Promise<int64_t>> getAverageShardBytes, UID distributorId, int teamSize,
int singleRegionTeamSize, double* lastLimited);
//Holds the permitted size and IO Bounds for a shard
struct ShardSizeBounds {

View File

@ -33,18 +33,6 @@ enum BandwidthStatus {
BandwidthStatusHigh
};
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
bool operator == ( ShardMetrics const& rhs ) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime && shardCount == rhs.shardCount;
}
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime, int shardCount) : metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime), shardCount(shardCount) {}
};
BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
if( metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC )
return BandwidthStatusHigh;
@ -69,16 +57,11 @@ ACTOR Future<Void> updateMaxShardSize( Reference<AsyncVar<int64_t>> dbSizeEstima
}
}
struct ShardTrackedData {
Future<Void> trackShard;
Future<Void> trackBytes;
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
struct DataDistributionTracker {
Database cx;
UID distributorId;
KeyRangeMap< ShardTrackedData > shards;
KeyRangeMap<ShardTrackedData>& shards;
ActorCollection sizeChanges;
int64_t systemSizeEstimate;
@ -93,16 +76,19 @@ struct DataDistributionTracker {
Promise<Void> readyToStart;
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
: cx(cx), distributorId( distributorId ), dbSizeEstimate( new AsyncVar<int64_t>() ), systemSizeEstimate(0),
maxShardSize( new AsyncVar<Optional<int64_t>>() ),
sizeChanges(false), readyToStart(readyToStart), output( output ), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
Reference<AsyncVar<bool>> anyZeroHealthyTeams, KeyRangeMap<ShardTrackedData>& shards)
: cx(cx), distributorId(distributorId), dbSizeEstimate(new AsyncVar<int64_t>()), systemSizeEstimate(0),
maxShardSize(new AsyncVar<Optional<int64_t>>()), sizeChanges(false), readyToStart(readyToStart), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams),
shards(shards) {}
~DataDistributionTracker()
{
//Cancel all actors so they aren't waiting on sizeChanged broken promise
sizeChanges.clear(false);
shards.insert( allKeys, ShardTrackedData() );
}
};
@ -292,7 +278,7 @@ ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<ShardMetrics>>>
ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRange keys, int64_t oldShardsEndingSize ) {
state vector<Future<int64_t>> sizes;
state vector<Future<int64_t>> systemSizes;
for (auto it : self->shards.intersectingRanges(keys) ) {
for (auto it : self->shards.intersectingRanges(keys)) {
Future<int64_t> thisSize = getFirstSize( it->value().stats );
sizes.push_back( thisSize );
if(it->range().begin >= systemKeys.begin) {
@ -641,8 +627,8 @@ ACTOR Future<Void> shardTracker(
}
}
void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Optional<ShardMetrics> startingSize ) {
auto ranges = self->shards.getAffectedRangesAfterInsertion( keys, ShardTrackedData() );
void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys, Optional<ShardMetrics> startingSize) {
auto ranges = self->shards.getAffectedRangesAfterInsertion(keys, ShardTrackedData());
for(int i=0; i<ranges.size(); i++) {
if( !ranges[i].value.trackShard.isValid() && ranges[i].begin != keys.begin ) {
// When starting, key space will be full of "dummy" default contructed entries.
@ -700,7 +686,7 @@ ACTOR Future<Void> fetchShardMetrics_impl( DataDistributionTracker* self, GetMet
loop {
Future<Void> onChange;
StorageMetrics returnMetrics;
for( auto t : self->shards.intersectingRanges( req.keys ) ) {
for (auto t : self->shards.intersectingRanges(req.keys)) {
auto &stats = t.value().stats;
if( !stats->get().present() ) {
onChange = stats->onChange();
@ -736,18 +722,15 @@ ACTOR Future<Void> fetchShardMetrics( DataDistributionTracker* self, GetMetricsR
return Void();
}
ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData,
Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart,
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId)
{
state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure, anyZeroHealthyTeams);
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData, Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId, KeyRangeMap<ShardTrackedData>* shards) {
state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure,
anyZeroHealthyTeams, *shards);
state Future<Void> loggingTrigger = Void();
try {
wait( trackInitialShards( &self, initData ) );
@ -759,10 +742,10 @@ ACTOR Future<Void> dataDistributionTracker(
}
when( wait( loggingTrigger ) ) {
TraceEvent("DDTrackerStats", self.distributorId)
.detail("Shards", self.shards.size())
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest( "DDTrackerStats" );
.detail("Shards", self.shards.size())
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest("DDTrackerStats");
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace);
}

View File

@ -320,6 +320,8 @@ public:
Map(Map&& r) BOOST_NOEXCEPT : set(std::move(r.set)) {}
void operator=(Map&& r) BOOST_NOEXCEPT { set = std::move(r.set); }
Future<Void> clearAsync();
private:
Map( Map<Key,Value,Pair> const& ); // unimplemented
void operator=( Map<Key,Value,Pair> const& ); // unimplemented
@ -1112,4 +1114,9 @@ Future<Void> IndexedSet<T, Metric>::eraseAsync(typename IndexedSet<T,Metric>::it
return uncancellable(ISFreeNodes(toFree, false));
}
template <class Key, class Value, class Pair, class Metric>
Future<Void> Map<Key, Value, Pair, Metric>::clearAsync() {
return set.eraseAsync(set.begin(), set.end());
}
#endif