update data ops definition and comments; add a unit test
This commit is contained in:
parent
55a3db82b5
commit
004a0f8915
|
@ -141,7 +141,7 @@ public:
|
|||
int maxSize = std::min(remainBytes, 130000) + 1;
|
||||
int randomSize = deterministicRandom()->randomInt(lastKey.size(), maxSize);
|
||||
|
||||
self->availableDiskSpace -= randomSize;
|
||||
self->usedDiskSpace += randomSize;
|
||||
self->byteSampleApplySet(lastKey, randomSize);
|
||||
remainBytes -= randomSize;
|
||||
lastKey = randomKeyBetween(KeyRangeRef(lastKey, params.keys.end));
|
||||
|
@ -267,7 +267,7 @@ void MockStorageServer::removeShard(KeyRangeRef range) {
|
|||
auto ranges = serverKeys.containedRanges(range);
|
||||
ASSERT(ranges.begin().range() == range);
|
||||
auto rangeSize = sumRangeSize(range);
|
||||
availableDiskSpace += rangeSize;
|
||||
usedDiskSpace -= rangeSize;
|
||||
serverKeys.rawErase(range);
|
||||
byteSampleApplyClear(range);
|
||||
metrics.notifyNotReadable(range);
|
||||
|
@ -310,8 +310,8 @@ Future<Void> MockStorageServer::run() {
|
|||
void MockStorageServer::set(KeyRef key, int64_t bytes, int64_t oldBytes) {
|
||||
notifyWriteMetrics(key, bytes);
|
||||
byteSampleApplySet(key, bytes);
|
||||
auto delta = oldBytes - bytes;
|
||||
availableDiskSpace += delta;
|
||||
auto delta = bytes - oldBytes;
|
||||
usedDiskSpace += delta;
|
||||
serverKeys[key].shardSize += delta;
|
||||
}
|
||||
|
||||
|
@ -319,16 +319,17 @@ void MockStorageServer::clear(KeyRef key, int64_t bytes) {
|
|||
notifyWriteMetrics(key, bytes);
|
||||
KeyRange sr = singleKeyRange(key);
|
||||
byteSampleApplyClear(sr);
|
||||
availableDiskSpace += bytes;
|
||||
usedDiskSpace -= bytes;
|
||||
serverKeys[key].shardSize -= bytes;
|
||||
}
|
||||
|
||||
void MockStorageServer::clearRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
int64_t MockStorageServer::clearRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
notifyWriteMetrics(range.begin, range.begin.size() + range.end.size());
|
||||
byteSampleApplyClear(range);
|
||||
auto totalByteSize = estimateRangeTotalBytes(range, beginShardBytes, endShardBytes);
|
||||
availableDiskSpace += totalByteSize;
|
||||
usedDiskSpace -= totalByteSize;
|
||||
clearRangeTotalBytes(range, beginShardBytes, endShardBytes);
|
||||
return totalByteSize;
|
||||
}
|
||||
|
||||
void MockStorageServer::get(KeyRef key, int64_t bytes) {
|
||||
|
@ -337,8 +338,8 @@ void MockStorageServer::get(KeyRef key, int64_t bytes) {
|
|||
metrics.notifyBytesReadPerKSecond(key, bytesReadPerKSecond);
|
||||
}
|
||||
|
||||
void MockStorageServer::getRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
auto totalByteSize = estimateRangeTotalBytes(range, beginShardBytes, endShardBytes);
|
||||
int64_t MockStorageServer::getRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
int64_t totalByteSize = estimateRangeTotalBytes(range, beginShardBytes, endShardBytes);
|
||||
// For performance concerns, the cost of a range read is billed to the start key and end key of the
|
||||
// range.
|
||||
if (totalByteSize > 0) {
|
||||
|
@ -346,6 +347,7 @@ void MockStorageServer::getRange(KeyRangeRef range, int64_t beginShardBytes, int
|
|||
metrics.notifyBytesReadPerKSecond(range.begin, bytesReadPerKSecond);
|
||||
metrics.notifyBytesReadPerKSecond(range.end, bytesReadPerKSecond);
|
||||
}
|
||||
return totalByteSize;
|
||||
}
|
||||
|
||||
int64_t MockStorageServer::estimateRangeTotalBytes(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes) {
|
||||
|
@ -609,6 +611,107 @@ Future<Standalone<VectorRef<KeyRef>>> MockGlobalState::splitStorageMetrics(const
|
|||
return MockGlobalStateImpl::splitStorageMetrics(this, keys, limit, estimated, minSplitBytes);
|
||||
}
|
||||
|
||||
std::vector<Future<Void>> MockGlobalState::runAllMockServers() {
|
||||
std::vector<Future<Void>> futures;
|
||||
futures.reserve(allServers.size());
|
||||
for (auto& [id, _] : allServers) {
|
||||
futures.emplace_back(runMockServer(id));
|
||||
}
|
||||
return futures;
|
||||
}
|
||||
Future<Void> MockGlobalState::runMockServer(const UID& id) {
|
||||
auto& server = allServers.at(id);
|
||||
IFailureMonitor::failureMonitor().setStatus(server.ssi.address(), FailureStatus(false));
|
||||
return server.run();
|
||||
}
|
||||
|
||||
int64_t MockGlobalState::get(KeyRef key) {
|
||||
auto ids = shardMapping->getSourceServerIdsFor(key);
|
||||
int64_t randomBytes = 0;
|
||||
if (deterministicRandom()->random01() > emptyProb) {
|
||||
randomBytes = deterministicRandom()->randomInt64(minByteSize, maxByteSize);
|
||||
}
|
||||
// randomly choose 1 server
|
||||
auto id = deterministicRandom()->randomChoice(ids);
|
||||
allServers.at(id).get(key, randomBytes);
|
||||
return randomBytes;
|
||||
}
|
||||
|
||||
int64_t MockGlobalState::getRange(KeyRangeRef range) {
|
||||
auto ranges = shardMapping->intersectingRanges(range);
|
||||
int64_t totalSize = 0;
|
||||
KeyRef begin, end;
|
||||
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
|
||||
auto ids = shardMapping->getSourceServerIdsFor(it->begin());
|
||||
if (range.begin > it->begin()) {
|
||||
begin = range.begin;
|
||||
}
|
||||
if (range.end < it->end()) {
|
||||
end = range.end;
|
||||
}
|
||||
|
||||
// randomly choose 1 server
|
||||
auto id = deterministicRandom()->randomChoice(ids);
|
||||
int64_t beginSize = deterministicRandom()->randomInt64(0, SERVER_KNOBS->MIN_SHARD_BYTES),
|
||||
endSize = deterministicRandom()->randomInt64(0, SERVER_KNOBS->MIN_SHARD_BYTES);
|
||||
totalSize += allServers.at(id).getRange(KeyRangeRef(begin, end), beginSize, endSize);
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
int64_t MockGlobalState::set(KeyRef key, int valueSize, bool insert) {
|
||||
auto ids = shardMapping->getSourceServerIdsFor(key);
|
||||
int64_t oldKvBytes = 0;
|
||||
insert |= (deterministicRandom()->random01() < emptyProb);
|
||||
|
||||
if (!insert) {
|
||||
oldKvBytes = key.size() + deterministicRandom()->randomInt64(minByteSize, maxByteSize);
|
||||
}
|
||||
|
||||
for (auto& id : ids) {
|
||||
allServers.at(id).set(key, valueSize + key.size(), oldKvBytes);
|
||||
}
|
||||
return oldKvBytes;
|
||||
}
|
||||
|
||||
int64_t MockGlobalState::clear(KeyRef key) {
|
||||
auto ids = shardMapping->getSourceServerIdsFor(key);
|
||||
int64_t randomBytes = 0;
|
||||
if (deterministicRandom()->random01() > emptyProb) {
|
||||
randomBytes = deterministicRandom()->randomInt64(minByteSize, maxByteSize) + key.size();
|
||||
}
|
||||
|
||||
for (auto& id : ids) {
|
||||
allServers.at(id).clear(key, randomBytes);
|
||||
}
|
||||
return randomBytes;
|
||||
}
|
||||
|
||||
int64_t MockGlobalState::clearRange(KeyRangeRef range) {
|
||||
auto ranges = shardMapping->intersectingRanges(range);
|
||||
int64_t totalSize = 0;
|
||||
KeyRef begin, end;
|
||||
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
|
||||
auto ids = shardMapping->getSourceServerIdsFor(it->begin());
|
||||
if (range.begin > it->begin()) {
|
||||
begin = range.begin;
|
||||
}
|
||||
if (range.end < it->end()) {
|
||||
end = range.end;
|
||||
}
|
||||
|
||||
int64_t beginSize = deterministicRandom()->randomInt64(0, SERVER_KNOBS->MIN_SHARD_BYTES),
|
||||
endSize = deterministicRandom()->randomInt64(0, SERVER_KNOBS->MIN_SHARD_BYTES);
|
||||
int64_t lastSize = -1;
|
||||
for (auto& id : ids) {
|
||||
int64_t size = allServers.at(id).clearRange(KeyRangeRef(begin, end), beginSize, endSize);
|
||||
ASSERT(lastSize == size || lastSize == -1); // every server should return the same result
|
||||
}
|
||||
totalSize += lastSize;
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
TEST_CASE("/MockGlobalState/initializeAsEmptyDatabaseMGS/SimpleThree") {
|
||||
BasicTestConfig testConfig;
|
||||
testConfig.simpleConfig = true;
|
||||
|
@ -803,15 +906,12 @@ TEST_CASE("/MockGlobalState/MockStorageServer/WaitStorageMetricsRequest") {
|
|||
|
||||
state std::shared_ptr<MockGlobalState> mgs = std::make_shared<MockGlobalState>();
|
||||
mgs->initializeAsEmptyDatabaseMGS(dbConfig);
|
||||
state ActorCollection actors;
|
||||
|
||||
ActorCollection* ptr = &actors; // get around ACTOR syntax restriction
|
||||
std::for_each(mgs->allServers.begin(), mgs->allServers.end(), [ptr](auto& server) {
|
||||
ptr->add(server.second.run());
|
||||
IFailureMonitor::failureMonitor().setStatus(server.second.ssi.address(), FailureStatus(false));
|
||||
std::for_each(mgs->allServers.begin(), mgs->allServers.end(), [](auto& server) {
|
||||
server.second.metrics.byteSample.sample.insert("something"_sr, 500000);
|
||||
});
|
||||
|
||||
state Future<Void> allServerFutures = waitForAll(mgs->runAllMockServers());
|
||||
|
||||
KeyRange testRange = allKeys;
|
||||
ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack();
|
||||
std::pair<Optional<StorageMetrics>, int> res =
|
||||
|
@ -822,3 +922,32 @@ TEST_CASE("/MockGlobalState/MockStorageServer/WaitStorageMetricsRequest") {
|
|||
ASSERT_EQ(res.first.get().bytes, 500000);
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/MockGlobalState/MockStorageServer/DataOpsSet") {
|
||||
BasicTestConfig testConfig;
|
||||
testConfig.simpleConfig = true;
|
||||
testConfig.minimumReplication = 1;
|
||||
testConfig.logAntiQuorum = 0;
|
||||
DatabaseConfiguration dbConfig = generateNormalDatabaseConfiguration(testConfig);
|
||||
TraceEvent("DataOpsUnitTestConfig").detail("Config", dbConfig.toString());
|
||||
state std::shared_ptr<MockGlobalState> mgs = std::make_shared<MockGlobalState>();
|
||||
mgs->initializeAsEmptyDatabaseMGS(dbConfig);
|
||||
state Future<Void> allServerFutures = waitForAll(mgs->runAllMockServers());
|
||||
|
||||
// use data ops
|
||||
state int64_t setBytes = 0;
|
||||
setBytes += mgs->set("a"_sr, 1 * SERVER_KNOBS->BYTES_WRITE_UNITS_PER_SAMPLE, true);
|
||||
setBytes += mgs->set("b"_sr, 2 * SERVER_KNOBS->BYTES_WRITE_UNITS_PER_SAMPLE, true);
|
||||
setBytes += mgs->set("c"_sr, 3 * SERVER_KNOBS->BYTES_WRITE_UNITS_PER_SAMPLE, true);
|
||||
for (auto& server : mgs->allServers) {
|
||||
ASSERT_EQ(server.second.sumRangeSize(KeyRangeRef("a"_sr, "c"_sr)),
|
||||
2 + 3 * SERVER_KNOBS->BYTES_WRITE_UNITS_PER_SAMPLE);
|
||||
ASSERT_EQ(server.second.usedDiskSpace, 3 + 6 * SERVER_KNOBS->BYTES_WRITE_UNITS_PER_SAMPLE);
|
||||
}
|
||||
ShardSizeBounds bounds = ShardSizeBounds::shardSizeBoundsBeforeTrack();
|
||||
std::pair<Optional<StorageMetrics>, int> res =
|
||||
wait(mgs->waitStorageMetrics(allKeys, bounds.min, bounds.max, bounds.permittedError, 1, 1));
|
||||
std::cout << "get result " << res.second << "\n";
|
||||
std::cout << "get byte " << res.first.get().bytes << " " << setBytes << "\n";
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -246,3 +246,13 @@ void ShardsAffectedByTeamFailure::removeFailedServerForRange(KeyRangeRef keys, c
|
|||
auto ShardsAffectedByTeamFailure::intersectingRanges(KeyRangeRef keyRange) const -> decltype(shard_teams)::ConstRanges {
|
||||
return shard_teams.intersectingRanges(keyRange);
|
||||
}
|
||||
|
||||
std::vector<UID> ShardsAffectedByTeamFailure::getSourceServerIdsFor(KeyRef key) {
|
||||
auto teamPair = getTeamsFor(key);
|
||||
std::set<UID> res;
|
||||
auto& srcTeams = teamPair.second.empty() ? teamPair.first : teamPair.second;
|
||||
for (auto& team : srcTeams) {
|
||||
res.insert(team.servers.begin(), team.servers.end());
|
||||
}
|
||||
return std::vector<UID>(res.begin(), res.end());
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ public:
|
|||
static constexpr uint64_t DEFAULT_DISK_SPACE = 1000LL * 1024 * 1024 * 1024;
|
||||
|
||||
// control plane statistics associated with a real storage server
|
||||
uint64_t totalDiskSpace = DEFAULT_DISK_SPACE, availableDiskSpace = DEFAULT_DISK_SPACE;
|
||||
uint64_t totalDiskSpace = DEFAULT_DISK_SPACE, usedDiskSpace = DEFAULT_DISK_SPACE;
|
||||
|
||||
// In-memory counterpart of the `serverKeys` in system keyspace
|
||||
// the value ShardStatus is [InFlight, Completed, Empty] and metrics uint64_t is the shard size, the caveat is the
|
||||
|
@ -96,8 +96,7 @@ public:
|
|||
MockStorageServer() = default;
|
||||
|
||||
MockStorageServer(StorageServerInterface ssi, uint64_t availableDiskSpace, uint64_t usedDiskSpace = 0)
|
||||
: totalDiskSpace(usedDiskSpace + availableDiskSpace), availableDiskSpace(availableDiskSpace), ssi(ssi),
|
||||
id(ssi.id()) {}
|
||||
: totalDiskSpace(usedDiskSpace + availableDiskSpace), usedDiskSpace(usedDiskSpace), ssi(ssi), id(ssi.id()) {}
|
||||
|
||||
MockStorageServer(const UID& id, uint64_t availableDiskSpace, uint64_t usedDiskSpace = 0)
|
||||
: MockStorageServer(StorageServerInterface(id), availableDiskSpace, usedDiskSpace) {}
|
||||
|
@ -154,13 +153,15 @@ public:
|
|||
// Clear key and its value of which the size is bytes
|
||||
void clear(KeyRef key, int64_t bytes);
|
||||
// Clear range, assuming the first and last shard within the range having size `beginShardBytes` and `endShardBytes`
|
||||
void clearRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes);
|
||||
// return the total range size
|
||||
int64_t clearRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes);
|
||||
|
||||
// modify the metrics as like doing an n-bytes read op
|
||||
// Read key and cause bytes read overhead
|
||||
void get(KeyRef key, int64_t bytes);
|
||||
// Read range, assuming the first and last shard within the range having size `beginShardBytes` and `endShardBytes`
|
||||
void getRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes);
|
||||
// Read range, assuming the first and last shard within the range having size `beginShardBytes` and `endShardBytes`,
|
||||
// return the total range size;
|
||||
int64_t getRange(KeyRangeRef range, int64_t beginShardBytes, int64_t endShardBytes);
|
||||
|
||||
// trigger the asynchronous fetch keys operation
|
||||
void signalFetchKeys(KeyRangeRef range, int64_t rangeTotalBytes);
|
||||
|
@ -280,6 +281,26 @@ public:
|
|||
Optional<UID> debugID,
|
||||
UseProvisionalProxies useProvisionalProxies,
|
||||
Version version) override;
|
||||
|
||||
// data ops
|
||||
// MGS finds the shard X contains this key, randomly generates a N-bytes read operation on that shard, which may
|
||||
// change the read sampling stats of shard X. return the random size of value
|
||||
int64_t get(KeyRef key);
|
||||
// For the edge shards contains the range boundaries, randomly do N1 byte and N2 byte read operations. For other
|
||||
// shards fully within the range, mock a full shard read op.
|
||||
int64_t getRange(KeyRangeRef range);
|
||||
// MGS finds the shard X contains this key, mock an N-bytes write to shard X, where N = valueSize + key.size().
|
||||
// Return a random number representing the old kv size
|
||||
int64_t set(KeyRef key, int valueSize, bool insert);
|
||||
// MGS finds the shard X contains this key, randomly generate an N-byte clear operation.
|
||||
// Return a random number representing the old kv size
|
||||
int64_t clear(KeyRef key);
|
||||
// Similar as getRange, but need to change shardTotalBytes because this is a clear operation.
|
||||
int64_t clearRange(KeyRangeRef range);
|
||||
|
||||
// convenient shortcuts for test
|
||||
std::vector<Future<Void>> runAllMockServers();
|
||||
Future<Void> runMockServer(const UID& id);
|
||||
};
|
||||
|
||||
#endif // FOUNDATIONDB_MOCKGLOBALSTATE_H
|
||||
|
|
|
@ -86,6 +86,8 @@ public:
|
|||
|
||||
std::pair<std::vector<Team>, std::vector<Team>> getTeamsFor(KeyRef key);
|
||||
|
||||
std::vector<UID> getSourceServerIdsFor(KeyRef key);
|
||||
|
||||
// Shard boundaries are modified in defineShard and the content of what servers correspond to each shard is a copy
|
||||
// or union of the shards already there
|
||||
void defineShard(KeyRangeRef keys);
|
||||
|
|
Loading…
Reference in New Issue