wantsToMerge was created before the shardEvaluator has a chance to update it based on shardSize changes
This commit is contained in:
parent
fde53cbeef
commit
b331c5dafe
|
@ -310,7 +310,7 @@ ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRange keys, in
|
|||
return Void();
|
||||
}
|
||||
|
||||
struct HasBeenTrueFor : NonCopyable {
|
||||
struct HasBeenTrueFor : ReferenceCounted<HasBeenTrueFor> {
|
||||
explicit HasBeenTrueFor( Optional<ShardMetrics> value ) {
|
||||
if(value.present()) {
|
||||
trigger = delayJittered(std::max(0.0, SERVER_KNOBS->DD_MERGE_COALESCE_DELAY + value.get().lastLowBandwidthStartTime - now()), decrementPriority(TaskPriority::DataDistribution) ) || cleared.getFuture();
|
||||
|
@ -425,7 +425,7 @@ Future<Void> shardMerger(
|
|||
StorageMetrics endingStats = shardSize->get().get().metrics;
|
||||
double lastLowBandwidthStartTime = shardSize->get().get().lastLowBandwidthStartTime;
|
||||
if(FLOW_KNOBS->DELAY_JITTER_OFFSET*SERVER_KNOBS->DD_MERGE_COALESCE_DELAY > SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY && now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY) {
|
||||
TraceEvent( g_network->isSimulated() ? SevError : SevWarnAlways, "ShardMergeTooSoon", self->distributorId).detail("Keys", keys);
|
||||
TraceEvent( g_network->isSimulated() ? SevError : SevWarnAlways, "ShardMergeTooSoon", self->distributorId).detail("Keys", keys).detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime);
|
||||
}
|
||||
|
||||
int64_t systemBytes = keys.begin >= systemKeys.begin ? shardSize->get().get().metrics.bytes : 0;
|
||||
|
@ -506,7 +506,8 @@ Future<Void> shardMerger(
|
|||
.detail("OldKeys", keys)
|
||||
.detail("NewKeys", mergeRange)
|
||||
.detail("EndingSize", endingStats.bytes)
|
||||
.detail("BatchedMerges", shardsMerged);
|
||||
.detail("BatchedMerges", shardsMerged)
|
||||
.detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime);
|
||||
|
||||
if(mergeRange.begin < systemKeys.begin) {
|
||||
self->systemSizeEstimate -= systemBytes;
|
||||
|
@ -523,7 +524,7 @@ ACTOR Future<Void> shardEvaluator(
|
|||
DataDistributionTracker* self,
|
||||
KeyRange keys,
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
|
||||
HasBeenTrueFor *wantsToMerge)
|
||||
Reference<HasBeenTrueFor> wantsToMerge)
|
||||
{
|
||||
Future<Void> onChange = shardSize->onChange() || yieldedFuture(self->maxShardSize->onChange());
|
||||
|
||||
|
@ -573,9 +574,6 @@ ACTOR Future<Void> shardTracker(
|
|||
KeyRange keys,
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize)
|
||||
{
|
||||
// Survives multiple calls to shardEvaluator and keeps merges from happening too quickly.
|
||||
state HasBeenTrueFor wantsToMerge( shardSize->get() );
|
||||
|
||||
wait( yieldedFuture(self->readyToStart.getFuture()) );
|
||||
|
||||
if( !shardSize->get().present() )
|
||||
|
@ -587,6 +585,9 @@ ACTOR Future<Void> shardTracker(
|
|||
// Since maxShardSize will become present for all shards at once, avoid slow tasks with a short delay
|
||||
wait( delay( 0, TaskPriority::DataDistribution ) );
|
||||
|
||||
// Survives multiple calls to shardEvaluator and keeps merges from happening too quickly.
|
||||
state Reference<HasBeenTrueFor> wantsToMerge( new HasBeenTrueFor( shardSize->get() ) );
|
||||
|
||||
/*TraceEvent("ShardTracker", self->distributorId)
|
||||
.detail("Begin", keys.begin)
|
||||
.detail("End", keys.end)
|
||||
|
@ -598,7 +599,7 @@ ACTOR Future<Void> shardTracker(
|
|||
try {
|
||||
loop {
|
||||
// Use the current known size to check for (and start) splits and merges.
|
||||
wait( shardEvaluator( self, keys, shardSize, &wantsToMerge ) );
|
||||
wait( shardEvaluator( self, keys, shardSize, wantsToMerge ) );
|
||||
|
||||
// We could have a lot of actors being released from the previous wait at the same time. Immediately calling
|
||||
// delay(0) mitigates the resulting SlowTask
|
||||
|
|
Loading…
Reference in New Issue