Credit to Evan for pointing out the missing line which costs me weeks debugging some weird behaviors.

This commit is contained in:
Xin Dong 2019-10-18 16:46:19 -07:00
parent 41aae9cbd9
commit 6a40ef25e5
1 changed files with 13 additions and 7 deletions

View File

@ -201,11 +201,13 @@ ACTOR Future<Void> trackShardBytes(
bounds.max.bytesReadPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC *
(1.0 + SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC_JITTER);
bounds.min.bytesReadPerKSecond = 0;
bounds.permittedError.bytesReadPerKSecond = bounds.min.bytesReadPerKSecond / 4;
} else if (newReadBandwithStatus == ReadBandwithStatusHigh) {
TEST(true);
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
bounds.min.bytesReadPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC *
(1.0 - SERVER_KNOBS->SHARD_MAX_BYTES_READ_PER_KSEC_JITTER);
bounds.permittedError.bytesReadPerKSecond = bounds.min.bytesReadPerKSecond / 4;
} else {
ASSERT(false);
}
@ -215,9 +217,10 @@ ACTOR Future<Void> trackShardBytes(
bounds.permittedError.bytes = -1;
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.min.bytesPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
bounds.min.bytesReadPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
bounds.permittedError.bytesReadPerKSecond = bounds.permittedError.infinity;
}
bounds.max.iosPerKSecond = bounds.max.infinity;
@ -374,6 +377,7 @@ ACTOR Future<Void> shardSplitter(
splitMetrics.bytes = shardBounds.max.bytes / 2;
splitMetrics.bytesPerKSecond = keys.begin >= keyServersKeys.begin ? splitMetrics.infinity : SERVER_KNOBS->SHARD_SPLIT_BYTES_PER_KSEC;
splitMetrics.iosPerKSecond = splitMetrics.infinity;
splitMetrics.bytesReadPerKSecond = splitMetrics.infinity; // Don't split by readBandwidth
state Standalone<VectorRef<KeyRef>> splitKeys = wait( getSplitKeys(self, keys, splitMetrics, metrics ) );
//fprintf(stderr, "split keys:\n");
@ -531,11 +535,12 @@ ACTOR Future<Void> shardEvaluator(
// so will will never attempt to merge that shard with the one previous.
ShardSizeBounds shardBounds = getShardSizeBounds(keys, self->maxShardSize->get().get());
StorageMetrics const& stats = shardSize->get().get();
auto bandwidthStatus = getBandwidthStatus( stats );
bool shouldSplit = stats.bytes > shardBounds.max.bytes ||
( getBandwidthStatus( stats ) == BandwidthStatusHigh && keys.begin < keyServersKeys.begin );
(bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin );
bool shouldMerge = stats.bytes < shardBounds.min.bytes &&
getBandwidthStatus( stats ) == BandwidthStatusLow;
bandwidthStatus == BandwidthStatusLow;
// Every invocation must set this or clear it
if(shouldMerge && !self->anyZeroHealthyTeams->get()) {
@ -550,17 +555,18 @@ ACTOR Future<Void> shardEvaluator(
}
}
/*TraceEvent("ShardEvaluator", self->distributorId)
/*TraceEvent("EdgeCaseTraceShardEvaluator", self->distributorId)
// .detail("TrackerId", trackerID)
.detail("BeginKey", keys.begin.printable())
.detail("EndKey", keys.end.printable())
.detail("BeginKey", keys.begin.printableNonNull())
.detail("EndKey", keys.end.printableNonNull())
.detail("ShouldSplit", shouldSplit)
.detail("ShouldMerge", shouldMerge)
.detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough())
.detail("CurrentMetrics", stats.toString())
.detail("ShardBoundsMaxBytes", shardBounds.max.bytes)
.detail("ShardBoundsMinBytes", shardBounds.min.bytes)
.detail("WriteBandwitdhStatus", getBandwidthStatus(stats));*/
.detail("WriteBandwitdhStatus", bandwidthStatus)
.detail("SplitBecauseHighWriteBandWidth", ( bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin ) ? "Yes" :"No");*/
if(!self->anyZeroHealthyTeams->get() && wantsToMerge->hasBeenTrueForLongEnough()) {
onChange = onChange || shardMerger( self, keys, shardSize );