change priority knob; change PromiseStream to FutureStream; remove comments; add on_sr check

This commit is contained in:
Xiaoxi Wang 2022-05-25 16:41:15 -07:00
parent cd40747261
commit 13a77dd5a2
7 changed files with 13 additions and 10 deletions

View File

@ -143,8 +143,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( PRIORITY_RECOVER_MOVE, 110 );
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 );
init( PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM, 122 );
init( PRIORITY_REBALANCE_READ_UNDERUTIL_TEAM, 121 );
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 122 );
init( PRIORITY_REBALANCE_READ_OVERUTIL_TEAM, 123 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 139 );
init( PRIORITY_TEAM_HEALTHY, 140 );

View File

@ -736,7 +736,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
output,
shardsAffectedByTeamFailure,
getShardMetrics,
getTopKShardMetrics,
getTopKShardMetrics.getFuture(),
getShardMetricsList,
getAverageShardBytes.getFuture(),
readyToStart,

View File

@ -321,7 +321,7 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetTopKMetricsRequest> getTopKMetrics,
FutureStream<GetTopKMetricsRequest> getTopKMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart,

View File

@ -1733,7 +1733,8 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
if (!val.present()) {
skipCurrentLoop = false;
} else {
if (val.get().size() > 0) {
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
if (readRebalance) {
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_READ) > 0;
@ -1845,7 +1846,8 @@ ACTOR Future<Void> BgDDMountainChopper(DDQueueData* self, int teamCollectionInde
}
skipCurrentLoop = false;
} else {
if (val.get().size() > 0) {
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
} else {
@ -1968,7 +1970,8 @@ ACTOR Future<Void> BgDDValleyFiller(DDQueueData* self, int teamCollectionIndex)
}
skipCurrentLoop = false;
} else {
if (val.get().size() > 0) {
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
} else {

View File

@ -1004,7 +1004,7 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetTopKMetricsRequest> getTopKMetrics,
FutureStream<GetTopKMetricsRequest> getTopKMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart,
@ -1043,7 +1043,7 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
when(GetMetricsRequest req = waitNext(getShardMetrics.getFuture())) {
self.sizeChanges.add(fetchShardMetrics(&self, req));
}
when(GetTopKMetricsRequest req = waitNext(getTopKMetrics.getFuture())) {
when(GetTopKMetricsRequest req = waitNext(getTopKMetrics)) {
self.sizeChanges.add(fetchTopKShardMetrics(&self, req));
}
when(GetMetricsListRequest req = waitNext(getShardMetricsList.getFuture())) {

View File

@ -2995,6 +2995,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
}
if (loadResult.get().rebalanceDDIgnored) {
// TODO: change the hex string to human-friendly fields like "rebalance_read", "rebalance_disk"
statusObj["data_distribution_disabled_for_rebalance"] = true;
statusObj["data_distribution_disabled_hex"] = loadResult.get().rebalanceDDIgnoreHex;
}

View File

@ -401,7 +401,6 @@ double TCTeamInfo::getLoadReadBandwidth(bool includeInFlight, double inflightPen
}
}
return (size == 0 ? 0 : sum / size) +
// we don't need to divide the inflight bandwidth because when added it the bandwidth is from single server
(includeInFlight ? inflightPenalty * getReadInFlightToTeam() / servers.size() : 0);
}