implement MSS as IStorageMetricsService and pass the unit test
This commit is contained in:
parent
3c67b7df39
commit
0d4b4d05e2
|
@ -212,7 +212,7 @@ ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys,
|
||||||
const Reference<AsyncVar<Optional<ShardMetrics>>>& shardMetrics,
|
const Reference<AsyncVar<Optional<ShardMetrics>>>& shardMetrics,
|
||||||
const BandwidthStatus& bandwidthStatus,
|
const BandwidthStatus& bandwidthStatus,
|
||||||
PromiseStream<KeyRange> readHotShard) {
|
PromiseStream<KeyRange> readHotShard) {
|
||||||
ShardSizeBounds bounds;
|
ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack();
|
||||||
if (shardMetrics->get().present()) {
|
if (shardMetrics->get().present()) {
|
||||||
auto bytes = shardMetrics->get().get().metrics.bytes;
|
auto bytes = shardMetrics->get().get().metrics.bytes;
|
||||||
auto readBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get().metrics);
|
auto readBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get().metrics);
|
||||||
|
@ -259,21 +259,7 @@ ShardSizeBounds calculateShardSizeBounds(const KeyRange& keys,
|
||||||
} else {
|
} else {
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
bounds.max.bytes = -1;
|
|
||||||
bounds.min.bytes = -1;
|
|
||||||
bounds.permittedError.bytes = -1;
|
|
||||||
bounds.max.bytesPerKSecond = bounds.max.infinity;
|
|
||||||
bounds.min.bytesPerKSecond = 0;
|
|
||||||
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
|
|
||||||
bounds.max.bytesReadPerKSecond = bounds.max.infinity;
|
|
||||||
bounds.min.bytesReadPerKSecond = 0;
|
|
||||||
bounds.permittedError.bytesReadPerKSecond = bounds.permittedError.infinity;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bounds.max.iosPerKSecond = bounds.max.infinity;
|
|
||||||
bounds.min.iosPerKSecond = 0;
|
|
||||||
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;
|
|
||||||
return bounds;
|
return bounds;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
#include "fdbserver/MockGlobalState.h"
|
#include "fdbserver/MockGlobalState.h"
|
||||||
#include "fdbserver/workloads/workloads.actor.h"
|
#include "fdbserver/workloads/workloads.actor.h"
|
||||||
|
#include "fdbserver/DataDistribution.actor.h"
|
||||||
#include "flow/actorcompiler.h"
|
#include "flow/actorcompiler.h"
|
||||||
|
|
||||||
class MockGlobalStateImpl {
|
class MockGlobalStateImpl {
|
||||||
|
@ -42,6 +43,7 @@ public:
|
||||||
UseProvisionalProxies::False,
|
UseProvisionalProxies::False,
|
||||||
0)
|
0)
|
||||||
.get();
|
.get();
|
||||||
|
TraceEvent(SevDebug, "MGSWaitStorageMetrics").detail("Phase", "GetLocation");
|
||||||
// NOTE(xwang): in native API, there's code handling the non-equal situation, but I think in mock world
|
// NOTE(xwang): in native API, there's code handling the non-equal situation, but I think in mock world
|
||||||
// there shouldn't have any delay to update the locations.
|
// there shouldn't have any delay to update the locations.
|
||||||
ASSERT_EQ(expectedShardCount, locations.size());
|
ASSERT_EQ(expectedShardCount, locations.size());
|
||||||
|
@ -92,6 +94,28 @@ public:
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class MockStorageServerImpl {
|
||||||
|
public:
|
||||||
|
ACTOR static Future<Void> waitMetricsTenantAware(MockStorageServer* self, WaitMetricsRequest req) {
|
||||||
|
if (req.tenantInfo.present() && req.tenantInfo.get().tenantId != TenantInfo::INVALID_TENANT) {
|
||||||
|
// TODO(xwang) add support for tenant test, search for tenant entry
|
||||||
|
Optional<TenantMapEntry> entry;
|
||||||
|
Optional<Key> tenantPrefix = entry.map<Key>([](TenantMapEntry e) { return e.prefix; });
|
||||||
|
if (tenantPrefix.present()) {
|
||||||
|
UNREACHABLE();
|
||||||
|
// req.keys = req.keys.withPrefix(tenantPrefix.get(), req.arena);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!self->isReadable(req.keys)) {
|
||||||
|
self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty());
|
||||||
|
} else {
|
||||||
|
wait(self->metrics.waitMetrics(req, delayJittered(SERVER_KNOBS->STORAGE_METRIC_TIMEOUT)));
|
||||||
|
}
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
bool MockStorageServer::allShardStatusEqual(KeyRangeRef range, MockShardStatus status) {
|
bool MockStorageServer::allShardStatusEqual(KeyRangeRef range, MockShardStatus status) {
|
||||||
auto ranges = serverKeys.intersectingRanges(range);
|
auto ranges = serverKeys.intersectingRanges(range);
|
||||||
ASSERT(!ranges.empty()); // at least the range is allKeys
|
ASSERT(!ranges.empty()); // at least the range is allKeys
|
||||||
|
@ -203,6 +227,22 @@ uint64_t MockStorageServer::sumRangeSize(KeyRangeRef range) const {
|
||||||
return totalSize;
|
return totalSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MockStorageServer::addActor(Future<Void> future) {
|
||||||
|
actors.add(future);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MockStorageServer::getSplitPoints(const SplitRangeRequest& req) {}
|
||||||
|
|
||||||
|
Future<Void> MockStorageServer::waitMetricsTenantAware(const WaitMetricsRequest& req) {
|
||||||
|
return MockStorageServerImpl::waitMetricsTenantAware(this, req);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MockStorageServer::getStorageMetrics(const GetStorageMetricsRequest& req) {}
|
||||||
|
|
||||||
|
Future<Void> MockStorageServer::run() {
|
||||||
|
return serveStorageMetricsRequests(this, ssi);
|
||||||
|
}
|
||||||
|
|
||||||
void MockGlobalState::initializeAsEmptyDatabaseMGS(const DatabaseConfiguration& conf, uint64_t defaultDiskSpace) {
|
void MockGlobalState::initializeAsEmptyDatabaseMGS(const DatabaseConfiguration& conf, uint64_t defaultDiskSpace) {
|
||||||
ASSERT(conf.storageTeamSize > 0);
|
ASSERT(conf.storageTeamSize > 0);
|
||||||
configuration = conf;
|
configuration = conf;
|
||||||
|
@ -544,3 +584,32 @@ TEST_CASE("/MockGlobalState/MockStorageServer/GetKeyLocations") {
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("/MockGlobalState/MockStorageServer/WaitStorageMetricsRequest") {
|
||||||
|
BasicTestConfig testConfig;
|
||||||
|
testConfig.simpleConfig = true;
|
||||||
|
testConfig.minimumReplication = 1;
|
||||||
|
testConfig.logAntiQuorum = 0;
|
||||||
|
DatabaseConfiguration dbConfig = generateNormalDatabaseConfiguration(testConfig);
|
||||||
|
TraceEvent("UnitTestDbConfig").detail("Config", dbConfig.toString());
|
||||||
|
|
||||||
|
state std::shared_ptr<MockGlobalState> mgs = std::make_shared<MockGlobalState>();
|
||||||
|
mgs->initializeAsEmptyDatabaseMGS(dbConfig);
|
||||||
|
state ActorCollection actors;
|
||||||
|
|
||||||
|
ActorCollection* ptr = &actors; // get around ACTOR syntax restriction
|
||||||
|
std::for_each(mgs->allServers.begin(), mgs->allServers.end(), [ptr](auto& server) {
|
||||||
|
ptr->add(server.second.run());
|
||||||
|
server.second.metrics.byteSample.sample.insert("something"_sr, 500000);
|
||||||
|
});
|
||||||
|
|
||||||
|
KeyRange testRange = allKeys;
|
||||||
|
ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack();
|
||||||
|
std::pair<Optional<StorageMetrics>, int> res =
|
||||||
|
wait(mgs->waitStorageMetrics(testRange, bounds.min, bounds.max, bounds.permittedError, 1, 1));
|
||||||
|
// std::cout << "get result " << res.second << "\n";
|
||||||
|
// std::cout << "get byte "<< res.first.get().bytes << "\n";
|
||||||
|
ASSERT_EQ(res.second, -1); // the valid result always return -1, strange contraction though.
|
||||||
|
ASSERT_EQ(res.first.get().bytes, 500000);
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
|
@ -476,6 +476,20 @@ struct ShardSizeBounds {
|
||||||
bool operator==(ShardSizeBounds const& rhs) const {
|
bool operator==(ShardSizeBounds const& rhs) const {
|
||||||
return max == rhs.max && min == rhs.min && permittedError == rhs.permittedError;
|
return max == rhs.max && min == rhs.min && permittedError == rhs.permittedError;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ShardSizeBounds shardSizeBoundsBeforeTrack() {
|
||||||
|
return ShardSizeBounds{
|
||||||
|
.max = StorageMetrics{ .bytes = -1,
|
||||||
|
.bytesPerKSecond = StorageMetrics::infinity,
|
||||||
|
.iosPerKSecond = StorageMetrics::infinity,
|
||||||
|
.bytesReadPerKSecond = StorageMetrics::infinity },
|
||||||
|
.min = StorageMetrics{ .bytes = -1, .bytesPerKSecond = 0, .iosPerKSecond = 0, .bytesReadPerKSecond = 0 },
|
||||||
|
.permittedError = StorageMetrics{ .bytes = -1,
|
||||||
|
.bytesPerKSecond = StorageMetrics::infinity,
|
||||||
|
.iosPerKSecond = StorageMetrics::infinity,
|
||||||
|
.bytesReadPerKSecond = StorageMetrics::infinity }
|
||||||
|
};
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Gets the permitted size and IO bounds for a shard
|
// Gets the permitted size and IO bounds for a shard
|
||||||
|
|
|
@ -52,9 +52,11 @@ inline bool isStatusTransitionValid(MockShardStatus from, MockShardStatus to) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
class MockStorageServer {
|
class MockStorageServer : public IStorageMetricsService {
|
||||||
friend struct MockGlobalStateTester;
|
friend struct MockGlobalStateTester;
|
||||||
|
|
||||||
|
ActorCollection actors;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
struct ShardInfo {
|
struct ShardInfo {
|
||||||
MockShardStatus status;
|
MockShardStatus status;
|
||||||
|
@ -74,8 +76,6 @@ public:
|
||||||
// size() and nthRange() would use the metrics as index instead
|
// size() and nthRange() would use the metrics as index instead
|
||||||
KeyRangeMap<ShardInfo> serverKeys;
|
KeyRangeMap<ShardInfo> serverKeys;
|
||||||
|
|
||||||
// sampled metrics
|
|
||||||
StorageServerMetrics metrics;
|
|
||||||
CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
|
CoalescedKeyRangeMap<bool, int64_t, KeyBytesMetric<int64_t>> byteSampleClears;
|
||||||
|
|
||||||
StorageServerInterface ssi; // serve RPC requests
|
StorageServerInterface ssi; // serve RPC requests
|
||||||
|
@ -104,6 +104,34 @@ public:
|
||||||
|
|
||||||
uint64_t sumRangeSize(KeyRangeRef range) const;
|
uint64_t sumRangeSize(KeyRangeRef range) const;
|
||||||
|
|
||||||
|
void addActor(Future<Void> future) override;
|
||||||
|
|
||||||
|
void getSplitPoints(SplitRangeRequest const& req) override;
|
||||||
|
|
||||||
|
Future<Void> waitMetricsTenantAware(const WaitMetricsRequest& req) override;
|
||||||
|
|
||||||
|
void getStorageMetrics(const GetStorageMetricsRequest& req) override;
|
||||||
|
|
||||||
|
template <class Reply>
|
||||||
|
using isLoadBalancedReply = std::is_base_of<LoadBalancedReply, Reply>;
|
||||||
|
|
||||||
|
template <class Reply>
|
||||||
|
typename std::enable_if<isLoadBalancedReply<Reply>::value, void>::type
|
||||||
|
sendErrorWithPenalty(const ReplyPromise<Reply>& promise, const Error& err, double penalty) {
|
||||||
|
Reply reply;
|
||||||
|
reply.error = err;
|
||||||
|
reply.penalty = penalty;
|
||||||
|
promise.send(reply);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class Reply>
|
||||||
|
typename std::enable_if<!isLoadBalancedReply<Reply>::value, void>::type
|
||||||
|
sendErrorWithPenalty(const ReplyPromise<Reply>& promise, const Error& err, double) {
|
||||||
|
promise.sendError(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Void> run();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void threeWayShardSplitting(KeyRangeRef outerRange,
|
void threeWayShardSplitting(KeyRangeRef outerRange,
|
||||||
KeyRangeRef innerRange,
|
KeyRangeRef innerRange,
|
||||||
|
|
Loading…
Reference in New Issue