solve merge problem; enable an empty top k unit tests
This commit is contained in:
parent
642f8f3a9f
commit
7dd1d7a029
|
@ -1339,8 +1339,11 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
|
|||
}
|
||||
|
||||
if (metrics.bytesReadPerKSecond > 0) {
|
||||
minReadLoad =
|
||||
std::min(metrics.bytesReadPerKSecond, std::max((decltype(minReadLoad))0, minReadLoad));
|
||||
if (minReadLoad == -1) {
|
||||
minReadLoad = metrics.bytesReadPerKSecond;
|
||||
} else {
|
||||
minReadLoad = std::min(metrics.bytesReadPerKSecond, minReadLoad);
|
||||
}
|
||||
maxReadLoad = std::max(metrics.bytesReadPerKSecond, maxReadLoad);
|
||||
if (req.minBytesReadPerKSecond <= metrics.bytesReadPerKSecond &&
|
||||
metrics.bytesReadPerKSecond <= req.maxBytesReadPerKSecond) {
|
||||
|
@ -1378,7 +1381,9 @@ ACTOR Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, Get
|
|||
|
||||
ACTOR Future<Void> fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKMetricsRequest req) {
|
||||
choose {
|
||||
when(wait(fetchTopKShardMetrics_impl(self, req))) {}
|
||||
// simulate time_out
|
||||
when(wait(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01) ? Never()
|
||||
: fetchTopKShardMetrics_impl(self, req))) {}
|
||||
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
|
||||
CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT");
|
||||
req.reply.send(GetTopKMetricsReply());
|
||||
|
@ -2086,25 +2091,23 @@ void PhysicalShardCollection::logPhysicalShardCollection() {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* FIXME: enable this test in the future
|
||||
// FIXME: complete this test with non-empty range
|
||||
TEST_CASE("/DataDistributor/Tracker/FetchTopK") {
|
||||
state DataDistributionTracker self;
|
||||
state std::vector<KeyRange> ranges;
|
||||
for (int i = 1; i <= 10; i += 2) {
|
||||
ranges.emplace_back(KeyRangeRef(doubleToTestKey(i), doubleToTestKey(i + 2)));
|
||||
std::cout << "add range: " << ranges.back().begin.toString() << "\n";
|
||||
}
|
||||
state GetTopKMetricsRequest req(ranges, 3, 1000, 100000);
|
||||
state DataDistributionTracker self;
|
||||
state std::vector<KeyRange> ranges;
|
||||
// for (int i = 1; i <= 10; i += 2) {
|
||||
// ranges.emplace_back(KeyRangeRef(doubleToTestKey(i), doubleToTestKey(i + 2)));
|
||||
// std::cout << "add range: " << ranges.back().begin.toString() << "\n";
|
||||
// }
|
||||
state GetTopKMetricsRequest req(ranges, 3, 1000, 100000);
|
||||
|
||||
double targetDensities[10] = { 2, 1, 3, 5, 4, 10, 6, 8, 7, 0 };
|
||||
for (int i = 0; i <= 5; ++i) {
|
||||
}
|
||||
wait(fetchTopKShardMetrics_impl(&self, req));
|
||||
auto& reply = req.reply.getFuture().get();
|
||||
ASSERT(reply.shardMetrics.empty());
|
||||
ASSERT(reply.maxReadLoad == -1);
|
||||
ASSERT(reply.minReadLoad == -1);
|
||||
// double targetDensities[10] = { 2, 1, 3, 5, 4, 10, 6, 8, 7, 0 };
|
||||
|
||||
return Void();
|
||||
} */
|
||||
wait(fetchTopKShardMetrics(&self, req));
|
||||
auto& reply = req.reply.getFuture().get();
|
||||
ASSERT(reply.shardMetrics.empty());
|
||||
ASSERT(reply.maxReadLoad == -1);
|
||||
ASSERT(reply.minReadLoad == -1);
|
||||
|
||||
return Void();
|
||||
}
|
|
@ -629,3 +629,8 @@ void DDMockTxnProcessor::setupMockGlobalState(Reference<InitialDataDistribution>
|
|||
|
||||
mgs->shardMapping->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::Normal);
|
||||
}
|
||||
|
||||
// FIXME: finish moveKeys implementation
|
||||
Future<Void> DDMockTxnProcessor::moveKeys(const MoveKeysParams& params) const {
|
||||
UNREACHABLE();
|
||||
}
|
||||
|
|
|
@ -173,6 +173,8 @@ public:
|
|||
|
||||
// test only
|
||||
void setupMockGlobalState(Reference<InitialDataDistribution> initData);
|
||||
|
||||
Future<Void> moveKeys(const MoveKeysParams& params) const override;
|
||||
};
|
||||
|
||||
#endif // FOUNDATIONDB_DDTXNPROCESSOR_H
|
||||
|
|
Loading…
Reference in New Issue