From 0d4b4d05e20c320004396f5f0114f1539eb1c44b Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Fri, 21 Oct 2022 16:53:03 -0700 Subject: [PATCH] implement MSS as IStorageMetricsService and pass the unit test --- fdbserver/DDShardTracker.actor.cpp | 16 +---- fdbserver/MockGlobalState.actor.cpp | 69 +++++++++++++++++++ .../fdbserver/DataDistribution.actor.h | 14 ++++ fdbserver/include/fdbserver/MockGlobalState.h | 34 ++++++++- 4 files changed, 115 insertions(+), 18 deletions(-) diff --git a/fdbserver/DDShardTracker.actor.cpp b/fdbserver/DDShardTracker.actor.cpp index be7343ba4c..7964915217 100644 --- a/fdbserver/DDShardTracker.actor.cpp +++ b/fdbserver/DDShardTracker.actor.cpp @@ -212,7 +212,7 @@ ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys, const Reference>>& shardMetrics, const BandwidthStatus& bandwidthStatus, PromiseStream readHotShard) { - ShardSizeBounds bounds; + ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack(); if (shardMetrics->get().present()) { auto bytes = shardMetrics->get().get().metrics.bytes; auto readBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get().metrics); @@ -259,21 +259,7 @@ ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys, } else { ASSERT(false); } - } else { - bounds.max.bytes = -1; - bounds.min.bytes = -1; - bounds.permittedError.bytes = -1; - bounds.max.bytesPerKSecond = bounds.max.infinity; - bounds.min.bytesPerKSecond = 0; - bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity; - bounds.max.bytesReadPerKSecond = bounds.max.infinity; - bounds.min.bytesReadPerKSecond = 0; - bounds.permittedError.bytesReadPerKSecond = bounds.permittedError.infinity; } - - bounds.max.iosPerKSecond = bounds.max.infinity; - bounds.min.iosPerKSecond = 0; - bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity; return bounds; } diff --git a/fdbserver/MockGlobalState.actor.cpp b/fdbserver/MockGlobalState.actor.cpp index bdeed264fd..1e03b71e85 100644 --- a/fdbserver/MockGlobalState.actor.cpp +++ b/fdbserver/MockGlobalState.actor.cpp @@ -20,6 +20,7 @@ #include "fdbserver/MockGlobalState.h" #include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/DataDistribution.actor.h" #include "flow/actorcompiler.h" class MockGlobalStateImpl { @@ -42,6 +43,7 @@ public: UseProvisionalProxies::False, 0) .get(); + TraceEvent(SevDebug, "MGSWaitStorageMetrics").detail("Phase", "GetLocation"); // NOTE(xwang): in native API, there's code handling the non-equal situation, but I think in mock world // there shouldn't have any delay to update the locations. ASSERT_EQ(expectedShardCount, locations.size()); @@ -92,6 +94,28 @@ public: } }; +class MockStorageServerImpl { +public: + ACTOR static Future waitMetricsTenantAware(MockStorageServer* self, WaitMetricsRequest req) { + if (req.tenantInfo.present() && req.tenantInfo.get().tenantId != TenantInfo::INVALID_TENANT) { + // TODO(xwang) add support for tenant test, search for tenant entry + Optional entry; + Optional tenantPrefix = entry.map([](TenantMapEntry e) { return e.prefix; }); + if (tenantPrefix.present()) { + UNREACHABLE(); + // req.keys = req.keys.withPrefix(tenantPrefix.get(), req.arena); + } + } + + if (!self->isReadable(req.keys)) { + self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty()); + } else { + wait(self->metrics.waitMetrics(req, delayJittered(SERVER_KNOBS->STORAGE_METRIC_TIMEOUT))); + } + return Void(); + } +}; + bool MockStorageServer::allShardStatusEqual(KeyRangeRef range, MockShardStatus status) { auto ranges = serverKeys.intersectingRanges(range); ASSERT(!ranges.empty()); // at least the range is allKeys @@ -203,6 +227,22 @@ uint64_t MockStorageServer::sumRangeSize(KeyRangeRef range) const { return totalSize; } +void MockStorageServer::addActor(Future future) { + actors.add(future); +} + +void MockStorageServer::getSplitPoints(const SplitRangeRequest& req) {} + +Future MockStorageServer::waitMetricsTenantAware(const WaitMetricsRequest& req) { + return MockStorageServerImpl::waitMetricsTenantAware(this, req); +} + +void MockStorageServer::getStorageMetrics(const GetStorageMetricsRequest& req) {} + +Future MockStorageServer::run() { + return serveStorageMetricsRequests(this, ssi); +} + void MockGlobalState::initializeAsEmptyDatabaseMGS(const DatabaseConfiguration& conf, uint64_t defaultDiskSpace) { ASSERT(conf.storageTeamSize > 0); configuration = conf; @@ -544,3 +584,32 @@ TEST_CASE("/MockGlobalState/MockStorageServer/GetKeyLocations") { return Void(); } + +TEST_CASE("/MockGlobalState/MockStorageServer/WaitStorageMetricsRequest") { + BasicTestConfig testConfig; + testConfig.simpleConfig = true; + testConfig.minimumReplication = 1; + testConfig.logAntiQuorum = 0; + DatabaseConfiguration dbConfig = generateNormalDatabaseConfiguration(testConfig); + TraceEvent("UnitTestDbConfig").detail("Config", dbConfig.toString()); + + state std::shared_ptr mgs = std::make_shared(); + mgs->initializeAsEmptyDatabaseMGS(dbConfig); + state ActorCollection actors; + + ActorCollection* ptr = &actors; // get around ACTOR syntax restriction + std::for_each(mgs->allServers.begin(), mgs->allServers.end(), [ptr](auto& server) { + ptr->add(server.second.run()); + server.second.metrics.byteSample.sample.insert("something"_sr, 500000); + }); + + KeyRange testRange = allKeys; + ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack(); + std::pair, int> res = + wait(mgs->waitStorageMetrics(testRange, bounds.min, bounds.max, bounds.permittedError, 1, 1)); + // std::cout << "get result " << res.second << "\n"; + // std::cout << "get byte "<< res.first.get().bytes << "\n"; + ASSERT_EQ(res.second, -1); // the valid result always return -1, strange contraction though. + ASSERT_EQ(res.first.get().bytes, 500000); + return Void(); +} diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index 2389dc0ab6..2e77d07459 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -476,6 +476,20 @@ struct ShardSizeBounds { bool operator==(ShardSizeBounds const& rhs) const { return max == rhs.max && min == rhs.min && permittedError == rhs.permittedError; } + + static ShardSizeBounds shardSizeBoundsBeforeTrack() { + return ShardSizeBounds{ + .max = StorageMetrics{ .bytes = -1, + .bytesPerKSecond = StorageMetrics::infinity, + .iosPerKSecond = StorageMetrics::infinity, + .bytesReadPerKSecond = StorageMetrics::infinity }, + .min = StorageMetrics{ .bytes = -1, .bytesPerKSecond = 0, .iosPerKSecond = 0, .bytesReadPerKSecond = 0 }, + .permittedError = StorageMetrics{ .bytes = -1, + .bytesPerKSecond = StorageMetrics::infinity, + .iosPerKSecond = StorageMetrics::infinity, + .bytesReadPerKSecond = StorageMetrics::infinity } + }; + } }; // Gets the permitted size and IO bounds for a shard diff --git a/fdbserver/include/fdbserver/MockGlobalState.h b/fdbserver/include/fdbserver/MockGlobalState.h index f3e5213892..a404f24027 100644 --- a/fdbserver/include/fdbserver/MockGlobalState.h +++ b/fdbserver/include/fdbserver/MockGlobalState.h @@ -52,9 +52,11 @@ inline bool isStatusTransitionValid(MockShardStatus from, MockShardStatus to) { return false; } -class MockStorageServer { +class MockStorageServer : public IStorageMetricsService { friend struct MockGlobalStateTester; + ActorCollection actors; + public: struct ShardInfo { MockShardStatus status; @@ -74,8 +76,6 @@ public: // size() and nthRange() would use the metrics as index instead KeyRangeMap serverKeys; - // sampled metrics - StorageServerMetrics metrics; CoalescedKeyRangeMap> byteSampleClears; StorageServerInterface ssi; // serve RPC requests @@ -104,6 +104,34 @@ public: uint64_t sumRangeSize(KeyRangeRef range) const; + void addActor(Future future) override; + + void getSplitPoints(SplitRangeRequest const& req) override; + + Future waitMetricsTenantAware(const WaitMetricsRequest& req) override; + + void getStorageMetrics(const GetStorageMetricsRequest& req) override; + + template + using isLoadBalancedReply = std::is_base_of; + + template + typename std::enable_if::value, void>::type + sendErrorWithPenalty(const ReplyPromise& promise, const Error& err, double penalty) { + Reply reply; + reply.error = err; + reply.penalty = penalty; + promise.send(reply); + } + + template + typename std::enable_if::value, void>::type + sendErrorWithPenalty(const ReplyPromise& promise, const Error& err, double) { + promise.sendError(err); + } + + Future run(); + protected: void threeWayShardSplitting(KeyRangeRef outerRange, KeyRangeRef innerRange,