fix DataDistributionQueue time_out ; reset the rebalance poll time

This commit is contained in:
Xiaoxi Wang 2022-05-04 14:11:20 -07:00
parent a3d0b005dc
commit ae66ed6c16
2 changed files with 14 additions and 5 deletions

View File

@ -1733,7 +1733,7 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueueData* self, int teamCollectionIndex,
try {
// FIXME: change back to BG_REBALANCE_SWITCH_CHECK_INTERVAL after test
delayF = delay(0.1, TaskPriority::DataDistributionLaunch);
delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
@ -2190,7 +2190,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
debug_setCheckRelocationDuration(false);
}
}
when(KeyRange done = waitNext(rangesComplete.getFuture())) { keysToLaunchFrom = done; }
when(KeyRange done = waitNext(rangesComplete.getFuture())) {
keysToLaunchFrom = done;
}
when(wait(recordMetrics)) {
Promise<int64_t> req;
getAverageShardBytes.send(req);
@ -2237,7 +2239,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
when(wait(waitForAll(balancingFutures))) {}
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); }
when(Promise<int> r = waitNext(getUnhealthyRelocationCount)) {
r.send(self.unhealthyRelocations);
}
}
}
} catch (Error& e) {

View File

@ -886,7 +886,10 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
ACTOR Future<Void> fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKMetricsRequest req) {
choose {
when(wait(fetchTopKShardMetrics_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) { req.reply.sendError(timed_out()); }
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
TEST(true); // TopK DD_SHARD_METRICS_TIMEOUT
req.reply.send(std::vector<StorageMetrics>(1));
}
}
return Void();
}
@ -973,7 +976,9 @@ ACTOR Future<Void> fetchShardMetricsList_impl(DataDistributionTracker* self, Get
ACTOR Future<Void> fetchShardMetricsList(DataDistributionTracker* self, GetMetricsListRequest req) {
choose {
when(wait(fetchShardMetricsList_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) { req.reply.sendError(timed_out()); }
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
req.reply.sendError(timed_out());
}
}
return Void();
}