1100 lines
44 KiB
C++
1100 lines
44 KiB
C++
/*
|
|
* DataDistributionTracker.actor.cpp
|
|
*
|
|
* This source file is part of the FoundationDB open source project
|
|
*
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#include "fdbrpc/FailureMonitor.h"
|
|
#include "fdbclient/SystemData.h"
|
|
#include "fdbserver/DataDistribution.actor.h"
|
|
#include "fdbserver/Knobs.h"
|
|
#include "fdbclient/DatabaseContext.h"
|
|
#include "flow/ActorCollection.h"
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
|
|
|
// The used bandwidth of a shard. The higher the value is, the busier the shard is.
|
|
enum BandwidthStatus {
|
|
BandwidthStatusLow,
|
|
BandwidthStatusNormal,
|
|
BandwidthStatusHigh
|
|
};
|
|
|
|
enum ReadBandwidthStatus { ReadBandwidthStatusNormal, ReadBandwidthStatusHigh };
|
|
|
|
BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
|
|
if( metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC )
|
|
return BandwidthStatusHigh;
|
|
else if( metrics.bytesPerKSecond < SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC )
|
|
return BandwidthStatusLow;
|
|
|
|
return BandwidthStatusNormal;
|
|
}
|
|
|
|
ReadBandwidthStatus getReadBandwidthStatus(StorageMetrics const& metrics) {
|
|
if (metrics.bytesReadPerKSecond <= SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS ||
|
|
metrics.bytesReadPerKSecond <= SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO * metrics.bytes *
|
|
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS) {
|
|
return ReadBandwidthStatusNormal;
|
|
} else {
|
|
return ReadBandwidthStatusHigh;
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> updateMaxShardSize( Reference<AsyncVar<int64_t>> dbSizeEstimate, Reference<AsyncVar<Optional<int64_t>>> maxShardSize ) {
|
|
state int64_t lastDbSize = 0;
|
|
state int64_t granularity = g_network->isSimulated() ?
|
|
SERVER_KNOBS->DD_SHARD_SIZE_GRANULARITY_SIM : SERVER_KNOBS->DD_SHARD_SIZE_GRANULARITY;
|
|
loop {
|
|
auto sizeDelta = std::abs(dbSizeEstimate->get() - lastDbSize);
|
|
if( sizeDelta > granularity || !maxShardSize->get().present() ) {
|
|
auto v = getMaxShardSize( dbSizeEstimate->get() );
|
|
maxShardSize->set( v );
|
|
lastDbSize = dbSizeEstimate->get();
|
|
}
|
|
wait( dbSizeEstimate->onChange() );
|
|
}
|
|
}
|
|
|
|
|
|
struct DataDistributionTracker {
|
|
Database cx;
|
|
UID distributorId;
|
|
KeyRangeMap<ShardTrackedData>& shards;
|
|
ActorCollection sizeChanges;
|
|
|
|
int64_t systemSizeEstimate;
|
|
Reference<AsyncVar<int64_t>> dbSizeEstimate;
|
|
Reference<AsyncVar<Optional<int64_t>>> maxShardSize;
|
|
Future<Void> maxShardSizeUpdater;
|
|
|
|
// CapacityTracker
|
|
PromiseStream<RelocateShard> output;
|
|
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
|
|
|
|
Promise<Void> readyToStart;
|
|
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
|
|
|
|
// Read hot detection
|
|
PromiseStream<KeyRange> readHotShard;
|
|
|
|
// The reference to trackerCancelled must be extracted by actors,
|
|
// because by the time (trackerCancelled == true) this memory cannot
|
|
// be accessed
|
|
bool const& trackerCancelled;
|
|
|
|
// This class extracts the trackerCancelled reference from a DataDistributionTracker object
|
|
// Because some actors spawned by the dataDistributionTracker outlive the DataDistributionTracker
|
|
// object, we must guard against memory errors by using a GetTracker functor to access
|
|
// the DataDistributionTracker object.
|
|
class SafeAccessor {
|
|
bool const& trackerCancelled;
|
|
DataDistributionTracker& tracker;
|
|
|
|
public:
|
|
SafeAccessor(DataDistributionTracker* tracker)
|
|
: trackerCancelled(tracker->trackerCancelled), tracker(*tracker) {
|
|
ASSERT(!trackerCancelled);
|
|
}
|
|
|
|
DataDistributionTracker* operator()() {
|
|
if (trackerCancelled) {
|
|
TEST(true); // Trying to access DataDistributionTracker after tracker has been cancelled
|
|
throw dd_tracker_cancelled();
|
|
}
|
|
return &tracker;
|
|
}
|
|
};
|
|
|
|
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart,
|
|
PromiseStream<RelocateShard> const& output,
|
|
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
|
|
Reference<AsyncVar<bool>> anyZeroHealthyTeams, KeyRangeMap<ShardTrackedData>& shards,
|
|
bool const& trackerCancelled)
|
|
: cx(cx), distributorId(distributorId), dbSizeEstimate(new AsyncVar<int64_t>()), systemSizeEstimate(0),
|
|
maxShardSize(new AsyncVar<Optional<int64_t>>()), sizeChanges(false), readyToStart(readyToStart), output(output),
|
|
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams),
|
|
shards(shards), trackerCancelled(trackerCancelled) {}
|
|
|
|
~DataDistributionTracker()
|
|
{
|
|
//Cancel all actors so they aren't waiting on sizeChanged broken promise
|
|
sizeChanges.clear(false);
|
|
}
|
|
};
|
|
|
|
void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys,
|
|
Optional<ShardMetrics> startingMetrics = Optional<ShardMetrics>());
|
|
|
|
// Gets the permitted size and IO bounds for a shard. A shard that starts at allKeys.begin
|
|
// (i.e. '') will have a permitted size of 0, since the database can contain no data.
|
|
ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize) {
|
|
ShardSizeBounds bounds;
|
|
|
|
if(shard.begin >= keyServersKeys.begin) {
|
|
bounds.max.bytes = SERVER_KNOBS->KEY_SERVER_SHARD_BYTES;
|
|
} else {
|
|
bounds.max.bytes = maxShardSize;
|
|
}
|
|
|
|
bounds.max.bytesPerKSecond = bounds.max.infinity;
|
|
bounds.max.iosPerKSecond = bounds.max.infinity;
|
|
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
|
|
|
|
//The first shard can have arbitrarily small size
|
|
if(shard.begin == allKeys.begin) {
|
|
bounds.min.bytes = 0;
|
|
} else {
|
|
bounds.min.bytes = maxShardSize / SERVER_KNOBS->SHARD_BYTES_RATIO;
|
|
}
|
|
|
|
bounds.min.bytesPerKSecond = 0;
|
|
bounds.min.iosPerKSecond = 0;
|
|
bounds.min.bytesReadPerKSecond = 0;
|
|
|
|
//The permitted error is 1/3 of the general-case minimum bytes (even in the special case where this is the last shard)
|
|
bounds.permittedError.bytes = bounds.max.bytes / SERVER_KNOBS->SHARD_BYTES_RATIO / 3;
|
|
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
|
|
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;
|
|
bounds.permittedError.bytesReadPerKSecond = bounds.permittedError.infinity;
|
|
|
|
return bounds;
|
|
}
|
|
|
|
int64_t getMaxShardSize( double dbSizeEstimate ) {
|
|
return std::min((SERVER_KNOBS->MIN_SHARD_BYTES + (int64_t)std::sqrt( dbSizeEstimate )*SERVER_KNOBS->SHARD_BYTES_PER_SQRT_BYTES) * SERVER_KNOBS->SHARD_BYTES_RATIO,
|
|
(int64_t)SERVER_KNOBS->MAX_SHARD_BYTES);
|
|
}
|
|
|
|
ACTOR Future<Void> trackShardMetrics(DataDistributionTracker::SafeAccessor self, KeyRange keys,
|
|
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics) {
|
|
state BandwidthStatus bandwidthStatus = shardMetrics->get().present() ? getBandwidthStatus( shardMetrics->get().get().metrics ) : BandwidthStatusNormal;
|
|
state double lastLowBandwidthStartTime = shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now();
|
|
state int shardCount = shardMetrics->get().present() ? shardMetrics->get().get().shardCount : 1;
|
|
state ReadBandwidthStatus readBandwidthStatus = shardMetrics->get().present() ? getReadBandwidthStatus(shardMetrics->get().get().metrics) : ReadBandwidthStatusNormal;
|
|
|
|
wait( delay( 0, TaskPriority::DataDistribution ) );
|
|
|
|
/*TraceEvent("TrackShardMetricsStarting")
|
|
.detail("TrackerID", trackerID)
|
|
.detail("Keys", keys)
|
|
.detail("TrackedBytesInitiallyPresent", shardMetrics->get().present())
|
|
.detail("StartingMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0)
|
|
.detail("StartingMerges", shardMetrics->get().present() ? shardMetrics->get().get().merges : 0);*/
|
|
|
|
try {
|
|
loop {
|
|
state ShardSizeBounds bounds;
|
|
if( shardMetrics->get().present() ) {
|
|
auto bytes = shardMetrics->get().get().metrics.bytes;
|
|
auto readBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get().metrics);
|
|
|
|
bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES );
|
|
bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) );
|
|
bounds.permittedError.bytes = bytes * 0.1;
|
|
if( bandwidthStatus == BandwidthStatusNormal ) { // Not high or low
|
|
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
|
|
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
|
|
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
|
|
} else if( bandwidthStatus == BandwidthStatusHigh ) { // > 10MB/sec for 100MB shard, proportionally lower for smaller shard, > 200KB/sec no matter what
|
|
bounds.max.bytesPerKSecond = bounds.max.infinity;
|
|
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
|
|
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
|
|
} else if( bandwidthStatus == BandwidthStatusLow ) { // < 10KB/sec
|
|
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
|
|
bounds.min.bytesPerKSecond = 0;
|
|
bounds.permittedError.bytesPerKSecond = bounds.max.bytesPerKSecond / 4;
|
|
} else {
|
|
ASSERT( false );
|
|
}
|
|
// handle read bandkwith status
|
|
if (readBandwidthStatus == ReadBandwidthStatusNormal) {
|
|
bounds.max.bytesReadPerKSecond =
|
|
std::max((int64_t)(SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO * bytes *
|
|
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS *
|
|
(1.0 + SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC_JITTER)),
|
|
SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS);
|
|
bounds.min.bytesReadPerKSecond = 0;
|
|
bounds.permittedError.bytesReadPerKSecond = bounds.min.bytesReadPerKSecond / 4;
|
|
} else if (readBandwidthStatus == ReadBandwidthStatusHigh) {
|
|
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
|
|
bounds.min.bytesReadPerKSecond = SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO * bytes *
|
|
SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS *
|
|
(1.0 - SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC_JITTER);
|
|
bounds.permittedError.bytesReadPerKSecond = bounds.min.bytesReadPerKSecond / 4;
|
|
// TraceEvent("RHDTriggerReadHotLoggingForShard")
|
|
// .detail("ShardBegin", keys.begin.printable().c_str())
|
|
// .detail("ShardEnd", keys.end.printable().c_str());
|
|
self()->readHotShard.send(keys);
|
|
} else {
|
|
ASSERT(false);
|
|
}
|
|
} else {
|
|
bounds.max.bytes = -1;
|
|
bounds.min.bytes = -1;
|
|
bounds.permittedError.bytes = -1;
|
|
bounds.max.bytesPerKSecond = bounds.max.infinity;
|
|
bounds.min.bytesPerKSecond = 0;
|
|
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
|
|
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
|
|
bounds.min.bytesReadPerKSecond = 0;
|
|
bounds.permittedError.bytesReadPerKSecond = bounds.permittedError.infinity;
|
|
}
|
|
|
|
bounds.max.iosPerKSecond = bounds.max.infinity;
|
|
bounds.min.iosPerKSecond = 0;
|
|
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;
|
|
|
|
loop {
|
|
Transaction tr(self()->cx);
|
|
// metrics.second is the number of key-ranges (i.e., shards) in the 'keys' key-range
|
|
std::pair<Optional<StorageMetrics>, int> metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, shardCount ) );
|
|
if(metrics.first.present()) {
|
|
BandwidthStatus newBandwidthStatus = getBandwidthStatus( metrics.first.get() );
|
|
if(newBandwidthStatus == BandwidthStatusLow && bandwidthStatus != BandwidthStatusLow) {
|
|
lastLowBandwidthStartTime = now();
|
|
}
|
|
bandwidthStatus = newBandwidthStatus;
|
|
|
|
/*TraceEvent("ShardSizeUpdate")
|
|
.detail("Keys", keys)
|
|
.detail("UpdatedSize", metrics.metrics.bytes)
|
|
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
|
|
.detail("BandwithStatus", getBandwidthStatus(metrics))
|
|
.detail("BytesLower", bounds.min.bytes)
|
|
.detail("BytesUpper", bounds.max.bytes)
|
|
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
|
|
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
|
|
.detail("ShardSizePresent", shardSize->get().present())
|
|
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0)
|
|
.detail("TrackerID", trackerID);*/
|
|
|
|
if( shardMetrics->get().present() ) {
|
|
self()->dbSizeEstimate->set(self()->dbSizeEstimate->get() + metrics.first.get().bytes -
|
|
shardMetrics->get().get().metrics.bytes);
|
|
if(keys.begin >= systemKeys.begin) {
|
|
self()->systemSizeEstimate +=
|
|
metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes;
|
|
}
|
|
}
|
|
|
|
shardMetrics->set( ShardMetrics(metrics.first.get(), lastLowBandwidthStartTime, shardCount) );
|
|
break;
|
|
} else {
|
|
shardCount = metrics.second;
|
|
if(shardMetrics->get().present()) {
|
|
auto newShardMetrics = shardMetrics->get().get();
|
|
newShardMetrics.shardCount = shardCount;
|
|
shardMetrics->set( newShardMetrics );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch( Error &e ) {
|
|
if (e.code() != error_code_actor_cancelled && e.code() != error_code_dd_tracker_cancelled) {
|
|
self()->output.sendError(e); // Propagate failure to dataDistributionTracker
|
|
}
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> readHotDetector(DataDistributionTracker* self) {
|
|
try {
|
|
loop {
|
|
state KeyRange keys = waitNext(self->readHotShard.getFuture());
|
|
state Transaction tr(self->cx);
|
|
loop {
|
|
try {
|
|
Standalone<VectorRef<ReadHotRangeWithMetrics>> readHotRanges = wait(tr.getReadHotRanges(keys));
|
|
for (auto& keyRange : readHotRanges) {
|
|
TraceEvent("ReadHotRangeLog")
|
|
.detail("ReadDensity", keyRange.density)
|
|
.detail("ReadBandwidth", keyRange.readBandwidth)
|
|
.detail("ReadDensityThreshold", SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO)
|
|
.detail("KeyRangeBegin", keyRange.keys.begin)
|
|
.detail("KeyRangeEnd", keyRange.keys.end);
|
|
}
|
|
break;
|
|
} catch (Error& e) {
|
|
wait(tr.onError(e));
|
|
}
|
|
}
|
|
}
|
|
} catch (Error& e) {
|
|
if (e.code() != error_code_actor_cancelled)
|
|
self->output.sendError(e); // Propagate failure to dataDistributionTracker
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
/*
|
|
ACTOR Future<Void> extrapolateShardBytes( Reference<AsyncVar<Optional<int64_t>>> inBytes, Reference<AsyncVar<Optional<int64_t>>> outBytes ) {
|
|
state std::deque< std::pair<double,int64_t> > past;
|
|
loop {
|
|
wait( inBytes->onChange() );
|
|
if( inBytes->get().present() ) {
|
|
past.push_back( std::make_pair(now(),inBytes->get().get()) );
|
|
if (past.size() < 2)
|
|
outBytes->set( inBytes->get() );
|
|
else {
|
|
while (past.size() > 1 && past.end()[-1].first - past.begin()[1].first > 1.0)
|
|
past.pop_front();
|
|
double rate = std::max(0.0, double(past.end()[-1].second-past.begin()[0].second)/(past.end()[-1].first - past.begin()[0].first));
|
|
outBytes->set( inBytes->get().get() + rate * 10.0 );
|
|
}
|
|
}
|
|
}
|
|
}*/
|
|
|
|
ACTOR Future<Standalone<VectorRef<KeyRef>>> getSplitKeys( DataDistributionTracker* self, KeyRange splitRange, StorageMetrics splitMetrics, StorageMetrics estimated ) {
|
|
loop {
|
|
state Transaction tr(self->cx);
|
|
try {
|
|
Standalone<VectorRef<KeyRef>> keys = wait( tr.splitStorageMetrics( splitRange, splitMetrics, estimated ) );
|
|
return keys;
|
|
} catch( Error &e ) {
|
|
wait( tr.onError(e) );
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<ShardMetrics>>> stats ) {
|
|
loop {
|
|
if(stats->get().present())
|
|
return stats->get().get().metrics.bytes;
|
|
wait( stats->onChange() );
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRange keys, int64_t oldShardsEndingSize ) {
|
|
state vector<Future<int64_t>> sizes;
|
|
state vector<Future<int64_t>> systemSizes;
|
|
for (auto it : self->shards.intersectingRanges(keys)) {
|
|
Future<int64_t> thisSize = getFirstSize( it->value().stats );
|
|
sizes.push_back( thisSize );
|
|
if(it->range().begin >= systemKeys.begin) {
|
|
systemSizes.push_back( thisSize );
|
|
}
|
|
}
|
|
|
|
wait( waitForAll( sizes ) );
|
|
wait( yield(TaskPriority::DataDistribution) );
|
|
|
|
int64_t newShardsStartingSize = 0;
|
|
for ( int i = 0; i < sizes.size(); i++ )
|
|
newShardsStartingSize += sizes[i].get();
|
|
|
|
int64_t newSystemShardsStartingSize = 0;
|
|
for ( int i = 0; i < systemSizes.size(); i++ )
|
|
newSystemShardsStartingSize += systemSizes[i].get();
|
|
|
|
int64_t totalSizeEstimate = self->dbSizeEstimate->get();
|
|
/*TraceEvent("TrackerChangeSizes")
|
|
.detail("TotalSizeEstimate", totalSizeEstimate)
|
|
.detail("EndSizeOfOldShards", oldShardsEndingSize)
|
|
.detail("StartingSizeOfNewShards", newShardsStartingSize);*/
|
|
self->dbSizeEstimate->set( totalSizeEstimate + newShardsStartingSize - oldShardsEndingSize );
|
|
self->systemSizeEstimate += newSystemShardsStartingSize;
|
|
if(keys.begin >= systemKeys.begin) {
|
|
self->systemSizeEstimate -= oldShardsEndingSize;
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
struct HasBeenTrueFor : ReferenceCounted<HasBeenTrueFor> {
|
|
explicit HasBeenTrueFor(const Optional<ShardMetrics>& value) {
|
|
if(value.present()) {
|
|
trigger = delayJittered(std::max(0.0, SERVER_KNOBS->DD_MERGE_COALESCE_DELAY + value.get().lastLowBandwidthStartTime - now()), TaskPriority::DataDistributionLow ) || cleared.getFuture();
|
|
}
|
|
}
|
|
|
|
Future<Void> set(double lastLowBandwidthStartTime) {
|
|
if( !trigger.isValid() ) {
|
|
cleared = Promise<Void>();
|
|
trigger =
|
|
delayJittered(SERVER_KNOBS->DD_MERGE_COALESCE_DELAY + std::max(lastLowBandwidthStartTime - now(), 0.0),
|
|
TaskPriority::DataDistributionLow) ||
|
|
cleared.getFuture();
|
|
}
|
|
return trigger;
|
|
}
|
|
void clear() {
|
|
if( !trigger.isValid() ) {
|
|
return;
|
|
}
|
|
trigger = Future<Void>();
|
|
cleared.send( Void() );
|
|
}
|
|
|
|
// True if this->value is true and has been true for this->seconds
|
|
bool hasBeenTrueForLongEnough() const {
|
|
return trigger.isValid() && trigger.isReady();
|
|
}
|
|
|
|
private:
|
|
Future<Void> trigger;
|
|
Promise<Void> cleared;
|
|
};
|
|
|
|
ACTOR Future<Void> shardSplitter(
|
|
DataDistributionTracker* self,
|
|
KeyRange keys,
|
|
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
|
|
ShardSizeBounds shardBounds )
|
|
{
|
|
state StorageMetrics metrics = shardSize->get().get().metrics;
|
|
state BandwidthStatus bandwidthStatus = getBandwidthStatus( metrics );
|
|
|
|
//Split
|
|
TEST(true); // shard to be split
|
|
|
|
StorageMetrics splitMetrics;
|
|
splitMetrics.bytes = shardBounds.max.bytes / 2;
|
|
splitMetrics.bytesPerKSecond = keys.begin >= keyServersKeys.begin ? splitMetrics.infinity : SERVER_KNOBS->SHARD_SPLIT_BYTES_PER_KSEC;
|
|
splitMetrics.iosPerKSecond = splitMetrics.infinity;
|
|
splitMetrics.bytesReadPerKSecond = splitMetrics.infinity; // Don't split by readBandwidth
|
|
|
|
state Standalone<VectorRef<KeyRef>> splitKeys = wait( getSplitKeys(self, keys, splitMetrics, metrics ) );
|
|
//fprintf(stderr, "split keys:\n");
|
|
//for( int i = 0; i < splitKeys.size(); i++ ) {
|
|
// fprintf(stderr, " %s\n", printable(splitKeys[i]).c_str());
|
|
//}
|
|
int numShards = splitKeys.size() - 1;
|
|
|
|
if( deterministicRandom()->random01() < 0.01 ) {
|
|
TraceEvent("RelocateShardStartSplitx100", self->distributorId)
|
|
.detail("Begin", keys.begin)
|
|
.detail("End", keys.end)
|
|
.detail("MaxBytes", shardBounds.max.bytes)
|
|
.detail("MetricsBytes", metrics.bytes)
|
|
.detail("Bandwidth", bandwidthStatus == BandwidthStatusHigh ? "High" : bandwidthStatus == BandwidthStatusNormal ? "Normal" : "Low")
|
|
.detail("BytesPerKSec", metrics.bytesPerKSecond)
|
|
.detail("NumShards", numShards);
|
|
}
|
|
|
|
if( numShards > 1 ) {
|
|
int skipRange = deterministicRandom()->randomInt(0, numShards);
|
|
// The queue can't deal with RelocateShard requests which split an existing shard into three pieces, so
|
|
// we have to send the unskipped ranges in this order (nibbling in from the edges of the old range)
|
|
for( int i = 0; i < skipRange; i++ )
|
|
restartShardTrackers( self, KeyRangeRef(splitKeys[i], splitKeys[i+1]) );
|
|
restartShardTrackers( self, KeyRangeRef( splitKeys[skipRange], splitKeys[skipRange+1] ) );
|
|
for( int i = numShards-1; i > skipRange; i-- )
|
|
restartShardTrackers( self, KeyRangeRef(splitKeys[i], splitKeys[i+1]) );
|
|
|
|
for( int i = 0; i < skipRange; i++ ) {
|
|
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
|
|
self->shardsAffectedByTeamFailure->defineShard( r );
|
|
self->output.send( RelocateShard( r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD) );
|
|
}
|
|
for( int i = numShards-1; i > skipRange; i-- ) {
|
|
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
|
|
self->shardsAffectedByTeamFailure->defineShard( r );
|
|
self->output.send( RelocateShard( r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD) );
|
|
}
|
|
|
|
self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().metrics.bytes ) );
|
|
} else {
|
|
wait( delay(1.0, TaskPriority::DataDistribution) ); //In case the reason the split point was off was due to a discrepancy between storage servers
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
ACTOR Future<Void> brokenPromiseToReady( Future<Void> f ) {
|
|
try {
|
|
wait(f);
|
|
} catch( Error &e ) {
|
|
if(e.code() != error_code_broken_promise) {
|
|
throw;
|
|
}
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
Future<Void> shardMerger(
|
|
DataDistributionTracker* self,
|
|
KeyRange const& keys,
|
|
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize )
|
|
{
|
|
int64_t maxShardSize = self->maxShardSize->get().get();
|
|
|
|
auto prevIter = self->shards.rangeContaining(keys.begin);
|
|
auto nextIter = self->shards.rangeContaining(keys.begin);
|
|
|
|
TEST(true); // shard to be merged
|
|
ASSERT( keys.begin > allKeys.begin );
|
|
|
|
// This will merge shards both before and after "this" shard in keyspace.
|
|
int shardsMerged = 1;
|
|
bool forwardComplete = false;
|
|
KeyRangeRef merged;
|
|
StorageMetrics endingStats = shardSize->get().get().metrics;
|
|
int shardCount = shardSize->get().get().shardCount;
|
|
double lastLowBandwidthStartTime = shardSize->get().get().lastLowBandwidthStartTime;
|
|
if(FLOW_KNOBS->DELAY_JITTER_OFFSET*SERVER_KNOBS->DD_MERGE_COALESCE_DELAY > SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY && now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY) {
|
|
TraceEvent( g_network->isSimulated() ? SevError : SevWarnAlways, "ShardMergeTooSoon", self->distributorId).detail("Keys", keys).detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime);
|
|
}
|
|
|
|
int64_t systemBytes = keys.begin >= systemKeys.begin ? shardSize->get().get().metrics.bytes : 0;
|
|
|
|
loop {
|
|
Optional<ShardMetrics> newMetrics;
|
|
if( !forwardComplete ) {
|
|
if( nextIter->range().end == allKeys.end ) {
|
|
forwardComplete = true;
|
|
continue;
|
|
}
|
|
++nextIter;
|
|
newMetrics = nextIter->value().stats->get();
|
|
|
|
// If going forward, give up when the next shard's stats are not yet present.
|
|
if( !newMetrics.present() || shardCount + newMetrics.get().shardCount >= CLIENT_KNOBS->SHARD_COUNT_LIMIT ) {
|
|
--nextIter;
|
|
forwardComplete = true;
|
|
continue;
|
|
}
|
|
} else {
|
|
--prevIter;
|
|
newMetrics = prevIter->value().stats->get();
|
|
|
|
// If going backward, stop when the stats are not present or if the shard is already over the merge
|
|
// bounds. If this check triggers right away (if we have not merged anything) then return a trigger
|
|
// on the previous shard changing "size".
|
|
if( !newMetrics.present() || shardCount + newMetrics.get().shardCount >= CLIENT_KNOBS->SHARD_COUNT_LIMIT ) {
|
|
if( shardsMerged == 1 ) {
|
|
TEST( true ); // shardMerger cannot merge anything
|
|
return brokenPromiseToReady( prevIter->value().stats->onChange() );
|
|
}
|
|
|
|
++prevIter;
|
|
break;
|
|
}
|
|
}
|
|
|
|
merged = KeyRangeRef( prevIter->range().begin, nextIter->range().end );
|
|
endingStats += newMetrics.get().metrics;
|
|
shardCount += newMetrics.get().shardCount;
|
|
lastLowBandwidthStartTime = newMetrics.get().lastLowBandwidthStartTime;
|
|
if((forwardComplete ? prevIter->range().begin : nextIter->range().begin) >= systemKeys.begin) {
|
|
systemBytes += newMetrics.get().metrics.bytes;
|
|
}
|
|
shardsMerged++;
|
|
|
|
auto shardBounds = getShardSizeBounds( merged, maxShardSize );
|
|
// If we just recently get the current shard's metrics (i.e., less than DD_LOW_BANDWIDTH_DELAY ago), it means
|
|
// the shard's metric may not be stable yet. So we cannot continue merging in this direction.
|
|
if( endingStats.bytes >= shardBounds.min.bytes ||
|
|
getBandwidthStatus( endingStats ) != BandwidthStatusLow ||
|
|
now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY ||
|
|
shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT ) {
|
|
// The merged range is larger than the min bounds so we cannot continue merging in this direction.
|
|
// This means that:
|
|
// 1. If we were going forwards (the starting direction), we roll back the last speculative merge.
|
|
// In this direction we do not want to go above this boundary since we will merge at least one in
|
|
// the other direction, even when that goes over the bounds.
|
|
// 2. If we were going backwards we always want to merge one more shard on (to make sure we go over
|
|
// the shard min bounds) so we "break" without resetting the merged range.
|
|
if( forwardComplete )
|
|
break;
|
|
|
|
// If going forward, remove most recently added range
|
|
endingStats -= newMetrics.get().metrics;
|
|
shardCount -= newMetrics.get().shardCount;
|
|
if(nextIter->range().begin >= systemKeys.begin) {
|
|
systemBytes -= newMetrics.get().metrics.bytes;
|
|
}
|
|
shardsMerged--;
|
|
--nextIter;
|
|
merged = KeyRangeRef( prevIter->range().begin, nextIter->range().end );
|
|
forwardComplete = true;
|
|
}
|
|
}
|
|
|
|
//restarting shard tracker will derefenced values in the shard map, so make a copy
|
|
KeyRange mergeRange = merged;
|
|
|
|
// OldKeys: Shards in the key range are merged as one shard defined by NewKeys;
|
|
// NewKeys: New key range after shards are merged;
|
|
// EndingSize: The new merged shard size in bytes;
|
|
// BatchedMerges: The number of shards merged. Each shard is defined in self->shards;
|
|
// LastLowBandwidthStartTime: When does a shard's bandwidth status becomes BandwidthStatusLow. If a shard's status
|
|
// becomes BandwidthStatusLow less than DD_LOW_BANDWIDTH_DELAY ago, the merging logic will stop at the shard;
|
|
// ShardCount: The number of non-splittable shards that are merged. Each shard is defined in self->shards may have
|
|
// more than 1 shards.
|
|
TraceEvent("RelocateShardMergeMetrics", self->distributorId)
|
|
.detail("OldKeys", keys)
|
|
.detail("NewKeys", mergeRange)
|
|
.detail("EndingSize", endingStats.bytes)
|
|
.detail("BatchedMerges", shardsMerged)
|
|
.detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime)
|
|
.detail("ShardCount", shardCount);
|
|
|
|
if(mergeRange.begin < systemKeys.begin) {
|
|
self->systemSizeEstimate -= systemBytes;
|
|
}
|
|
restartShardTrackers( self, mergeRange, ShardMetrics(endingStats, lastLowBandwidthStartTime, shardCount) );
|
|
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
|
|
self->output.send( RelocateShard( mergeRange, SERVER_KNOBS->PRIORITY_MERGE_SHARD ) );
|
|
|
|
// We are about to be cancelled by the call to restartShardTrackers
|
|
return Void();
|
|
}
|
|
|
|
ACTOR Future<Void> shardEvaluator(
|
|
DataDistributionTracker* self,
|
|
KeyRange keys,
|
|
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
|
|
Reference<HasBeenTrueFor> wantsToMerge)
|
|
{
|
|
Future<Void> onChange = shardSize->onChange() || yieldedFuture(self->maxShardSize->onChange());
|
|
|
|
// There are the bounds inside of which we are happy with the shard size.
|
|
// getShardSizeBounds() will allways have shardBounds.min.bytes == 0 for shards that start at allKeys.begin,
|
|
// so will will never attempt to merge that shard with the one previous.
|
|
ShardSizeBounds shardBounds = getShardSizeBounds(keys, self->maxShardSize->get().get());
|
|
StorageMetrics const& stats = shardSize->get().get().metrics;
|
|
auto bandwidthStatus = getBandwidthStatus( stats );
|
|
|
|
bool shouldSplit = stats.bytes > shardBounds.max.bytes ||
|
|
(bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin );
|
|
bool shouldMerge = stats.bytes < shardBounds.min.bytes &&
|
|
bandwidthStatus == BandwidthStatusLow;
|
|
|
|
// Every invocation must set this or clear it
|
|
if(shouldMerge && !self->anyZeroHealthyTeams->get()) {
|
|
auto whenLongEnough = wantsToMerge->set(shardSize->get().get().lastLowBandwidthStartTime);
|
|
if( !wantsToMerge->hasBeenTrueForLongEnough() ) {
|
|
onChange = onChange || whenLongEnough;
|
|
}
|
|
} else {
|
|
wantsToMerge->clear();
|
|
if(shouldMerge) {
|
|
onChange = onChange || self->anyZeroHealthyTeams->onChange();
|
|
}
|
|
}
|
|
|
|
/*TraceEvent("EdgeCaseTraceShardEvaluator", self->distributorId)
|
|
// .detail("TrackerId", trackerID)
|
|
.detail("BeginKey", keys.begin.printableNonNull())
|
|
.detail("EndKey", keys.end.printableNonNull())
|
|
.detail("ShouldSplit", shouldSplit)
|
|
.detail("ShouldMerge", shouldMerge)
|
|
.detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough())
|
|
.detail("CurrentMetrics", stats.toString())
|
|
.detail("ShardBoundsMaxBytes", shardBounds.max.bytes)
|
|
.detail("ShardBoundsMinBytes", shardBounds.min.bytes)
|
|
.detail("WriteBandwitdhStatus", bandwidthStatus)
|
|
.detail("SplitBecauseHighWriteBandWidth", ( bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin ) ? "Yes" :"No");*/
|
|
|
|
if(!self->anyZeroHealthyTeams->get() && wantsToMerge->hasBeenTrueForLongEnough()) {
|
|
onChange = onChange || shardMerger( self, keys, shardSize );
|
|
}
|
|
if( shouldSplit ) {
|
|
onChange = onChange || shardSplitter( self, keys, shardSize, shardBounds );
|
|
}
|
|
|
|
wait( onChange );
|
|
return Void();
|
|
}
|
|
|
|
ACTOR Future<Void> shardTracker(DataDistributionTracker::SafeAccessor self, KeyRange keys,
|
|
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize) {
|
|
wait(yieldedFuture(self()->readyToStart.getFuture()));
|
|
|
|
if( !shardSize->get().present() )
|
|
wait( shardSize->onChange() );
|
|
|
|
if (!self()->maxShardSize->get().present()) wait(yieldedFuture(self()->maxShardSize->onChange()));
|
|
|
|
// Since maxShardSize will become present for all shards at once, avoid slow tasks with a short delay
|
|
wait( delay( 0, TaskPriority::DataDistribution ) );
|
|
|
|
// Survives multiple calls to shardEvaluator and keeps merges from happening too quickly.
|
|
state Reference<HasBeenTrueFor> wantsToMerge( new HasBeenTrueFor( shardSize->get() ) );
|
|
|
|
/*TraceEvent("ShardTracker", self()->distributorId)
|
|
.detail("Begin", keys.begin)
|
|
.detail("End", keys.end)
|
|
.detail("TrackerID", trackerID)
|
|
.detail("MaxBytes", self()->maxShardSize->get().get())
|
|
.detail("ShardSize", shardSize->get().get().bytes)
|
|
.detail("BytesPerKSec", shardSize->get().get().bytesPerKSecond);*/
|
|
|
|
try {
|
|
loop {
|
|
// Use the current known size to check for (and start) splits and merges.
|
|
wait(shardEvaluator(self(), keys, shardSize, wantsToMerge));
|
|
|
|
// We could have a lot of actors being released from the previous wait at the same time. Immediately calling
|
|
// delay(0) mitigates the resulting SlowTask
|
|
wait( delay(0, TaskPriority::DataDistribution) );
|
|
}
|
|
} catch (Error& e) {
|
|
if (e.code() != error_code_actor_cancelled && e.code() != error_code_dd_tracker_cancelled) {
|
|
self()->output.sendError(e); // Propagate failure to dataDistributionTracker
|
|
}
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys, Optional<ShardMetrics> startingMetrics) {
|
|
auto ranges = self->shards.getAffectedRangesAfterInsertion(keys, ShardTrackedData());
|
|
for(int i=0; i<ranges.size(); i++) {
|
|
if( !ranges[i].value.trackShard.isValid() && ranges[i].begin != keys.begin ) {
|
|
// When starting, key space will be full of "dummy" default contructed entries.
|
|
// This should happen when called from trackInitialShards()
|
|
ASSERT( !self->readyToStart.isSet() );
|
|
continue;
|
|
}
|
|
|
|
auto shardMetrics = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
|
|
|
// For the case where the new tracker will take over at the boundaries of current shard(s)
|
|
// we can use the old size if it is available. This will be the case when merging shards.
|
|
if (startingMetrics.present()) {
|
|
ASSERT( ranges.size() == 1 );
|
|
/*TraceEvent("ShardTrackerSizePreset", self->distributorId)
|
|
.detail("Keys", keys)
|
|
.detail("Size", startingMetrics.get().metrics.bytes)
|
|
.detail("Merges", startingMetrics.get().merges);*/
|
|
TEST( true ); // shardTracker started with trackedBytes already set
|
|
shardMetrics->set(startingMetrics);
|
|
}
|
|
|
|
ShardTrackedData data;
|
|
data.stats = shardMetrics;
|
|
data.trackShard = shardTracker(DataDistributionTracker::SafeAccessor(self), ranges[i], shardMetrics);
|
|
data.trackBytes = trackShardMetrics(DataDistributionTracker::SafeAccessor(self), ranges[i], shardMetrics);
|
|
self->shards.insert( ranges[i], data );
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> trackInitialShards(DataDistributionTracker *self, Reference<InitialDataDistribution> initData)
|
|
{
|
|
TraceEvent("TrackInitialShards", self->distributorId).detail("InitialShardCount", initData->shards.size());
|
|
|
|
//This line reduces the priority of shard initialization to prevent interference with failure monitoring.
|
|
//SOMEDAY: Figure out what this priority should actually be
|
|
wait( delay( 0.0, TaskPriority::DataDistribution ) );
|
|
|
|
state int s;
|
|
for(s=0; s<initData->shards.size()-1; s++) {
|
|
restartShardTrackers( self, KeyRangeRef( initData->shards[s].key, initData->shards[s+1].key ) );
|
|
wait( yield( TaskPriority::DataDistribution ) );
|
|
}
|
|
|
|
Future<Void> initialSize = changeSizes( self, KeyRangeRef(allKeys.begin, allKeys.end), 0 );
|
|
self->readyToStart.send(Void());
|
|
wait( initialSize );
|
|
self->maxShardSizeUpdater = updateMaxShardSize( self->dbSizeEstimate, self->maxShardSize );
|
|
|
|
return Void();
|
|
}
|
|
|
|
ACTOR Future<Void> fetchShardMetrics_impl( DataDistributionTracker* self, GetMetricsRequest req ) {
|
|
try {
|
|
loop {
|
|
Future<Void> onChange;
|
|
StorageMetrics returnMetrics;
|
|
for (auto t : self->shards.intersectingRanges(req.keys)) {
|
|
auto &stats = t.value().stats;
|
|
if( !stats->get().present() ) {
|
|
onChange = stats->onChange();
|
|
break;
|
|
}
|
|
returnMetrics += t.value().stats->get().get().metrics;
|
|
}
|
|
|
|
if( !onChange.isValid() ) {
|
|
req.reply.send( returnMetrics );
|
|
return Void();
|
|
}
|
|
|
|
wait( onChange );
|
|
}
|
|
} catch( Error &e ) {
|
|
if( e.code() != error_code_actor_cancelled && !req.reply.isSet() )
|
|
req.reply.sendError(e);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> fetchShardMetrics( DataDistributionTracker* self, GetMetricsRequest req ) {
|
|
choose {
|
|
when( wait( fetchShardMetrics_impl( self, req ) ) ) {}
|
|
when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT, TaskPriority::DataDistribution ) ) ) {
|
|
TEST(true); // DD_SHARD_METRICS_TIMEOUT
|
|
StorageMetrics largeMetrics;
|
|
largeMetrics.bytes = SERVER_KNOBS->MAX_SHARD_BYTES;
|
|
req.reply.send( largeMetrics );
|
|
}
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
ACTOR Future<Void> fetchShardMetricsList_impl( DataDistributionTracker* self, GetMetricsListRequest req ) {
|
|
try {
|
|
loop {
|
|
// used to control shard limit
|
|
int shardNum = 0;
|
|
// list of metrics, regenerate on loop when full range unsuccessful
|
|
Standalone<VectorRef<DDMetricsRef>> result;
|
|
Future<Void> onChange;
|
|
auto beginIter = self->shards.containedRanges(req.keys).begin();
|
|
auto endIter = self->shards.intersectingRanges(req.keys).end();
|
|
for (auto t = beginIter; t != endIter; ++t) {
|
|
auto &stats = t.value().stats;
|
|
if( !stats->get().present() ) {
|
|
onChange = stats->onChange();
|
|
break;
|
|
}
|
|
result.push_back_deep(result.arena(),
|
|
DDMetricsRef(stats->get().get().metrics.bytes, KeyRef(t.begin().toString())));
|
|
++shardNum;
|
|
if (shardNum >= req.shardLimit) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if( !onChange.isValid() ) {
|
|
req.reply.send( result );
|
|
return Void();
|
|
}
|
|
|
|
wait( onChange );
|
|
}
|
|
} catch( Error &e ) {
|
|
if( e.code() != error_code_actor_cancelled && !req.reply.isSet() )
|
|
req.reply.sendError(e);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR Future<Void> fetchShardMetricsList( DataDistributionTracker* self, GetMetricsListRequest req ) {
|
|
choose {
|
|
when( wait( fetchShardMetricsList_impl( self, req ) ) ) {}
|
|
when( wait( delay( SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT ) ) ) {
|
|
req.reply.sendError(timed_out());
|
|
}
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData, Database cx,
|
|
PromiseStream<RelocateShard> output,
|
|
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
|
|
PromiseStream<GetMetricsRequest> getShardMetrics,
|
|
PromiseStream<GetMetricsListRequest> getShardMetricsList,
|
|
FutureStream<Promise<int64_t>> getAverageShardBytes,
|
|
Promise<Void> readyToStart, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
|
|
UID distributorId, KeyRangeMap<ShardTrackedData>* shards,
|
|
bool const* trackerCancelled) {
|
|
state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure,
|
|
anyZeroHealthyTeams, *shards, *trackerCancelled);
|
|
state Future<Void> loggingTrigger = Void();
|
|
state Future<Void> readHotDetect = readHotDetector(&self);
|
|
try {
|
|
wait( trackInitialShards( &self, initData ) );
|
|
initData = Reference<InitialDataDistribution>();
|
|
|
|
loop choose {
|
|
when( Promise<int64_t> req = waitNext( getAverageShardBytes ) ) {
|
|
req.send( self.maxShardSize->get().get() / 2 );
|
|
}
|
|
when( wait( loggingTrigger ) ) {
|
|
TraceEvent("DDTrackerStats", self.distributorId)
|
|
.detail("Shards", self.shards.size())
|
|
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
|
|
.detail("SystemSizeBytes", self.systemSizeEstimate)
|
|
.trackLatest("DDTrackerStats");
|
|
|
|
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace);
|
|
}
|
|
when( GetMetricsRequest req = waitNext( getShardMetrics.getFuture() ) ) {
|
|
self.sizeChanges.add( fetchShardMetrics( &self, req ) );
|
|
}
|
|
when( GetMetricsListRequest req = waitNext( getShardMetricsList.getFuture() ) ) {
|
|
self.sizeChanges.add( fetchShardMetricsList( &self, req ) );
|
|
}
|
|
when( wait( self.sizeChanges.getResult() ) ) {}
|
|
}
|
|
} catch (Error& e) {
|
|
TraceEvent(SevError, "DataDistributionTrackerError", self.distributorId).error(e);
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
vector<KeyRange> ShardsAffectedByTeamFailure::getShardsFor( Team team ) {
|
|
vector<KeyRange> r;
|
|
for(auto it = team_shards.lower_bound( std::pair<Team,KeyRange>( team, KeyRangeRef() ) ); it != team_shards.end() && it->first == team; ++it)
|
|
r.push_back( it->second );
|
|
return r;
|
|
}
|
|
|
|
bool ShardsAffectedByTeamFailure::hasShards(Team team) {
|
|
auto it = team_shards.lower_bound(std::pair<Team, KeyRange>(team, KeyRangeRef()));
|
|
return it != team_shards.end() && it->first == team;
|
|
}
|
|
|
|
int ShardsAffectedByTeamFailure::getNumberOfShards( UID ssID ) {
|
|
return storageServerShards[ssID];
|
|
}
|
|
|
|
std::pair<vector<ShardsAffectedByTeamFailure::Team>,vector<ShardsAffectedByTeamFailure::Team>> ShardsAffectedByTeamFailure::getTeamsFor( KeyRangeRef keys ) {
|
|
return shard_teams[keys.begin];
|
|
}
|
|
|
|
void ShardsAffectedByTeamFailure::erase(Team team, KeyRange const& range) {
|
|
if(team_shards.erase( std::pair<Team,KeyRange>(team, range) ) > 0) {
|
|
for (auto uid = team.servers.begin(); uid != team.servers.end(); ++uid) {
|
|
// Safeguard against going negative after eraseServer() sets value to 0
|
|
if (storageServerShards[*uid] > 0) {
|
|
storageServerShards[*uid]--;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void ShardsAffectedByTeamFailure::eraseServer(UID ssID) {
|
|
storageServerShards[ssID] = 0;
|
|
}
|
|
|
|
void ShardsAffectedByTeamFailure::insert(Team team, KeyRange const& range) {
|
|
if(team_shards.insert( std::pair<Team,KeyRange>( team, range ) ).second) {
|
|
for(auto uid = team.servers.begin(); uid != team.servers.end(); ++uid)
|
|
storageServerShards[*uid]++;
|
|
}
|
|
}
|
|
|
|
void ShardsAffectedByTeamFailure::defineShard( KeyRangeRef keys ) {
|
|
std::vector<Team> teams;
|
|
std::vector<Team> prevTeams;
|
|
auto rs = shard_teams.intersectingRanges(keys);
|
|
for(auto it = rs.begin(); it != rs.end(); ++it) {
|
|
for(auto t=it->value().first.begin(); t!=it->value().first.end(); ++t) {
|
|
teams.push_back( *t );
|
|
erase(*t, it->range());
|
|
}
|
|
for(auto t=it->value().second.begin(); t!=it->value().second.end(); ++t) {
|
|
prevTeams.push_back( *t );
|
|
}
|
|
}
|
|
uniquify(teams);
|
|
uniquify(prevTeams);
|
|
|
|
/*TraceEvent("ShardsAffectedByTeamFailureDefine")
|
|
.detail("KeyBegin", keys.begin)
|
|
.detail("KeyEnd", keys.end)
|
|
.detail("TeamCount", teams.size());*/
|
|
|
|
auto affectedRanges = shard_teams.getAffectedRangesAfterInsertion(keys);
|
|
shard_teams.insert( keys, std::make_pair(teams, prevTeams) );
|
|
|
|
for(auto r=affectedRanges.begin(); r != affectedRanges.end(); ++r) {
|
|
auto& t = shard_teams[r->begin];
|
|
for(auto it=t.first.begin(); it!=t.first.end(); ++it) {
|
|
insert(*it, *r);
|
|
}
|
|
}
|
|
check();
|
|
}
|
|
|
|
// Move keys to destinationTeams by updating shard_teams
|
|
void ShardsAffectedByTeamFailure::moveShard( KeyRangeRef keys, std::vector<Team> destinationTeams ) {
|
|
/*TraceEvent("ShardsAffectedByTeamFailureMove")
|
|
.detail("KeyBegin", keys.begin)
|
|
.detail("KeyEnd", keys.end)
|
|
.detail("NewTeamSize", destinationTeam.size())
|
|
.detail("NewTeam", describe(destinationTeam));*/
|
|
|
|
auto ranges = shard_teams.intersectingRanges( keys );
|
|
std::vector< std::pair<std::pair<std::vector<Team>,std::vector<Team>>,KeyRange> > modifiedShards;
|
|
for(auto it = ranges.begin(); it != ranges.end(); ++it) {
|
|
if( keys.contains( it->range() ) ) {
|
|
// erase the many teams that were associated with this one shard
|
|
for(auto t = it->value().first.begin(); t != it->value().first.end(); ++t) {
|
|
erase(*t, it->range());
|
|
}
|
|
|
|
// save this modification for later insertion
|
|
std::vector<Team> prevTeams = it->value().second;
|
|
prevTeams.insert(prevTeams.end(), it->value().first.begin(), it->value().first.end());
|
|
uniquify(prevTeams);
|
|
|
|
modifiedShards.push_back( std::pair<std::pair<std::vector<Team>,std::vector<Team>>,KeyRange>( std::make_pair(destinationTeams, prevTeams), it->range() ) );
|
|
} else {
|
|
// for each range that touches this move, add our team as affecting this range
|
|
for(auto& team : destinationTeams) {
|
|
insert(team, it->range());
|
|
}
|
|
|
|
// if we are not in the list of teams associated with this shard, add us in
|
|
auto& teams = it->value();
|
|
teams.second.insert(teams.second.end(), teams.first.begin(), teams.first.end());
|
|
uniquify(teams.second);
|
|
|
|
teams.first.insert(teams.first.end(), destinationTeams.begin(), destinationTeams.end());
|
|
uniquify(teams.first);
|
|
}
|
|
}
|
|
|
|
// we cannot modify the KeyRangeMap while iterating through it, so add saved modifications now
|
|
for( int i = 0; i < modifiedShards.size(); i++ ) {
|
|
for( auto& t : modifiedShards[i].first.first) {
|
|
insert(t, modifiedShards[i].second);
|
|
}
|
|
shard_teams.insert( modifiedShards[i].second, modifiedShards[i].first );
|
|
}
|
|
|
|
check();
|
|
}
|
|
|
|
void ShardsAffectedByTeamFailure::finishMove( KeyRangeRef keys ) {
|
|
auto ranges = shard_teams.containedRanges(keys);
|
|
for(auto it = ranges.begin(); it != ranges.end(); ++it) {
|
|
it.value().second.clear();
|
|
}
|
|
}
|
|
|
|
void ShardsAffectedByTeamFailure::check() {
|
|
if (EXPENSIVE_VALIDATION) {
|
|
for(auto t = team_shards.begin(); t != team_shards.end(); ++t) {
|
|
auto i = shard_teams.rangeContaining(t->second.begin);
|
|
if (i->range() != t->second ||
|
|
!std::count(i->value().first.begin(), i->value().first.end(), t->first))
|
|
{
|
|
ASSERT(false);
|
|
}
|
|
}
|
|
auto rs = shard_teams.ranges();
|
|
for(auto i = rs.begin(); i != rs.end(); ++i)
|
|
for(vector<Team>::iterator t = i->value().first.begin(); t != i->value().first.end(); ++t)
|
|
if (!team_shards.count( std::make_pair( *t, i->range() ) )) {
|
|
std::string teamDesc, shards;
|
|
for(int k=0; k<t->servers.size(); k++)
|
|
teamDesc += format("%llx ", t->servers[k].first());
|
|
for(auto x = team_shards.lower_bound( std::make_pair( *t, KeyRangeRef() ) ); x != team_shards.end() && x->first == *t; ++x)
|
|
shards += printable(x->second.begin) + "-" + printable(x->second.end) + ",";
|
|
TraceEvent(SevError,"SATFInvariantError2")
|
|
.detail("KB", i->begin())
|
|
.detail("KE", i->end())
|
|
.detail("Team", teamDesc)
|
|
.detail("Shards", shards);
|
|
ASSERT(false);
|
|
}
|
|
}
|
|
}
|