add status verification; add skipDDModeCheck in getInitialDataDistribution for workload test

This commit is contained in:
Xiaoxi Wang 2022-10-13 12:42:11 -07:00
parent 1f8713df30
commit 83685e6a61
6 changed files with 83 additions and 39 deletions

View File

@ -240,7 +240,8 @@ class DDTxnProcessorImpl {
UID distributorId,
MoveKeysLock moveKeysLock,
std::vector<Optional<Key>> remoteDcIds,
const DDEnabledState* ddEnabledState) {
const DDEnabledState* ddEnabledState,
bool skipDDModeCheck = false) {
state Reference<InitialDataDistribution> result = makeReference<InitialDataDistribution>();
state Key beginKey = allKeys.begin;
@ -253,6 +254,7 @@ class DDTxnProcessorImpl {
state std::vector<std::pair<StorageServerInterface, ProcessClass>> tss_servers;
state int numDataMoves = 0;
CODE_PROBE(skipDDModeCheck, "DD Mode won't prevent read initial data distribution.");
// Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure
// causing entries to be duplicated
loop {
@ -285,7 +287,7 @@ class DDTxnProcessorImpl {
BinaryReader rd(mode.get(), Unversioned());
rd >> result->mode;
}
if (!result->mode || !ddEnabledState->isDDEnabled()) {
if ((!skipDDModeCheck && !result->mode) || !ddEnabledState->isDDEnabled()) {
// DD can be disabled persistently (result->mode = 0) or transiently (isDDEnabled() = 0)
TraceEvent(SevDebug, "GetInitialDataDistribution_DisabledDD").log();
return result;
@ -620,8 +622,10 @@ Future<Reference<InitialDataDistribution>> DDTxnProcessor::getInitialDataDistrib
const UID& distributorId,
const MoveKeysLock& moveKeysLock,
const std::vector<Optional<Key>>& remoteDcIds,
const DDEnabledState* ddEnabledState) {
return DDTxnProcessorImpl::getInitialDataDistribution(cx, distributorId, moveKeysLock, remoteDcIds, ddEnabledState);
const DDEnabledState* ddEnabledState,
bool skipDDModeCheck) {
return DDTxnProcessorImpl::getInitialDataDistribution(
cx, distributorId, moveKeysLock, remoteDcIds, ddEnabledState, skipDDModeCheck);
}
Future<Void> DDTxnProcessor::waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const {
@ -787,7 +791,8 @@ Future<Reference<InitialDataDistribution>> DDMockTxnProcessor::getInitialDataDis
const UID& distributorId,
const MoveKeysLock& moveKeysLock,
const std::vector<Optional<Key>>& remoteDcIds,
const DDEnabledState* ddEnabledState) {
const DDEnabledState* ddEnabledState,
bool skipDDModeCheck) {
// FIXME: now we just ignore ddEnabledState and moveKeysLock, will fix it in the future
Reference<InitialDataDistribution> res = makeReference<InitialDataDistribution>();
@ -847,7 +852,6 @@ void DDMockTxnProcessor::setupMockGlobalState(Reference<InitialDataDistribution>
mgs->shardMapping->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::Normal);
}
// FIXME: finish moveKeys implementation
Future<Void> DDMockTxnProcessor::moveKeys(const MoveKeysParams& params) {
// Not support location metadata yet
ASSERT(!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
@ -886,6 +890,7 @@ Future<std::vector<ProcessData>> DDMockTxnProcessor::getWorkers() const {
Future<Void> DDMockTxnProcessor::rawStartMovement(MoveKeysParams& params,
std::map<UID, StorageServerInterface>& tssMapping) {
FlowLock::Releaser releaser(*params.startMoveKeysParallelismLock);
std::vector<ShardsAffectedByTeamFailure::Team> destTeams;
destTeams.emplace_back(params.destinationTeam, true);
mgs->shardMapping->moveShard(params.keys, destTeams);
@ -898,6 +903,8 @@ Future<Void> DDMockTxnProcessor::rawStartMovement(MoveKeysParams& params,
Future<Void> DDMockTxnProcessor::rawFinishMovement(MoveKeysParams& params,
const std::map<UID, StorageServerInterface>& tssMapping) {
FlowLock::Releaser releaser(*params.finishMoveKeysParallelismLock);
// get source and dest teams
auto [destTeams, srcTeams] = mgs->shardMapping->getTeamsFor(params.keys);

View File

@ -316,7 +316,8 @@ public:
ddId,
lock,
configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(),
context->ddEnabledState.get()));
context->ddEnabledState.get(),
false));
}
void initDcInfo() {

View File

@ -76,7 +76,7 @@ void MockStorageServer::threeWayShardSplitting(KeyRangeRef outerRange,
outerRangeSize - leftSize - SERVER_KNOBS->MIN_SHARD_BYTES + 1);
int rightSize = outerRangeSize - leftSize - midSize;
serverKeys.insert(innerRange, { MockShardStatus::UNSET, (uint64_t)midSize });
serverKeys.insert(innerRange, { serverKeys[left].status, (uint64_t)midSize });
serverKeys[left].shardSize = leftSize;
serverKeys[innerRange.end].shardSize = rightSize;
}
@ -89,7 +89,7 @@ void MockStorageServer::twoWayShardSplitting(KeyRangeRef range, KeyRef splitPoin
int leftSize =
deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, rangeSize - SERVER_KNOBS->MIN_SHARD_BYTES + 1);
int rightSize = rangeSize - leftSize;
serverKeys.rawInsert(splitPoint, { MockShardStatus::UNSET, (uint64_t)rightSize });
serverKeys.rawInsert(splitPoint, { serverKeys[left].status, (uint64_t)rightSize });
serverKeys[left].shardSize = leftSize;
}
@ -102,7 +102,7 @@ void MockStorageServer::removeShard(KeyRangeRef range) {
uint64_t MockStorageServer::sumRangeSize(KeyRangeRef range) const {
auto ranges = serverKeys.intersectingRanges(range);
uint64_t totalSize = 0;
for(auto it = ranges.begin(); it != ranges.end(); ++ it) {
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
totalSize += it->cvalue().shardSize;
}
return totalSize;
@ -194,6 +194,7 @@ struct MockGlobalStateTester {
auto it = mss.serverKeys.ranges().begin();
uint64_t oldSize =
deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, std::numeric_limits<int>::max());
MockShardStatus oldStatus = it.cvalue().status;
it->value().shardSize = oldSize;
KeyRangeRef outerRange = it->range();
Key x1 = keyAfter(it->range().begin);
@ -205,6 +206,7 @@ struct MockGlobalStateTester {
ASSERT(ranges.begin().range() == KeyRangeRef(outerRange.begin, x1));
ranges.pop_front();
ASSERT(ranges.begin().range() == KeyRangeRef(x1, x2));
ASSERT(ranges.begin().cvalue().status == oldStatus);
ranges.pop_front();
ASSERT(ranges.begin().range() == KeyRangeRef(x2, outerRange.end));
ranges.pop_front();
@ -214,6 +216,7 @@ struct MockGlobalStateTester {
// expectation [r0.begin, r0.end) => [r0.begin, x1), [x1, r0.end)
void testTwoWaySplitFirstRange(MockStorageServer& mss) {
auto it = mss.serverKeys.nthRange(0);
MockShardStatus oldStatus = it.cvalue().status;
uint64_t oldSize =
deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, std::numeric_limits<int>::max());
it->value().shardSize = oldSize;
@ -226,6 +229,7 @@ struct MockGlobalStateTester {
ASSERT(ranges.begin().range() == KeyRangeRef(outerRange.begin, x1));
ranges.pop_front();
ASSERT(ranges.begin().range() == KeyRangeRef(x1, outerRange.end));
ASSERT(ranges.begin().cvalue().status == oldStatus);
ranges.pop_front();
ASSERT(ranges.empty());
}
@ -247,7 +251,7 @@ TEST_CASE("/MockGlobalState/MockStorageServer/SplittingFunctions") {
std::cout << "Test 3-way splitting...\n";
tester.testThreeWaySplitFirstRange(mss);
std::cout << "Test 2-way splitting...\n";
mss.serverKeys.insert(allKeys, {MockShardStatus::COMPLETED, 0}); // reset to empty
mss.serverKeys.insert(allKeys, { MockShardStatus::COMPLETED, 0 }); // reset to empty
tester.testTwoWaySplitFirstRange(mss);
return Void();

View File

@ -70,7 +70,8 @@ public:
const UID& distributorId,
const MoveKeysLock& moveKeysLock,
const std::vector<Optional<Key>>& remoteDcIds,
const DDEnabledState* ddEnabledState) = 0;
const DDEnabledState* ddEnabledState,
bool skipDDModeCheck) = 0;
virtual ~IDDTxnProcessor() = default;
@ -140,7 +141,8 @@ public:
virtual Future<std::vector<ProcessData>> getWorkers() const = 0;
protected:
virtual Future<Void> rawStartMovement(MoveKeysParams& params, std::map<UID, StorageServerInterface>& tssMapping) = 0;
virtual Future<Void> rawStartMovement(MoveKeysParams& params,
std::map<UID, StorageServerInterface>& tssMapping) = 0;
virtual Future<Void> rawFinishMovement(MoveKeysParams& params,
const std::map<UID, StorageServerInterface>& tssMapping) = 0;
@ -168,11 +170,11 @@ public:
// Call NativeAPI implementation directly
Future<ServerWorkerInfos> getServerListAndProcessClasses() override;
Future<Reference<InitialDataDistribution>> getInitialDataDistribution(
const UID& distributorId,
const MoveKeysLock& moveKeysLock,
const std::vector<Optional<Key>>& remoteDcIds,
const DDEnabledState* ddEnabledState) override;
Future<Reference<InitialDataDistribution>> getInitialDataDistribution(const UID& distributorId,
const MoveKeysLock& moveKeysLock,
const std::vector<Optional<Key>>& remoteDcIds,
const DDEnabledState* ddEnabledState,
bool skipDDModeCheck) override;
Future<MoveKeysLock> takeMoveKeysLock(UID const& ddId) const override;
@ -253,11 +255,11 @@ public:
Future<ServerWorkerInfos> getServerListAndProcessClasses() override;
Future<Reference<InitialDataDistribution>> getInitialDataDistribution(
const UID& distributorId,
const MoveKeysLock& moveKeysLock,
const std::vector<Optional<Key>>& remoteDcIds,
const DDEnabledState* ddEnabledState) override;
Future<Reference<InitialDataDistribution>> getInitialDataDistribution(const UID& distributorId,
const MoveKeysLock& moveKeysLock,
const std::vector<Optional<Key>>& remoteDcIds,
const DDEnabledState* ddEnabledState,
bool skipDDModeCheck) override;
Future<Void> removeKeysFromFailedServer(const UID& serverID,
const std::vector<UID>& teamForDroppedRange,

View File

@ -30,10 +30,11 @@
struct MockGlobalStateTester;
enum class MockShardStatus { UNSET = -1, EMPTY = 0, COMPLETED, INFLIGHT };
enum class MockShardStatus { EMPTY = 0, COMPLETED, INFLIGHT, UNSET };
class MockStorageServer {
friend struct MockGlobalStateTester;
public:
struct ShardInfo {
MockShardStatus status;

View File

@ -55,6 +55,8 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
static constexpr auto NAME = "IDDTxnProcessorApiCorrectness";
bool enabled;
double testDuration;
double meanDelay = 0.05;
double maxKeyspace = 0.1;
DDSharedContext ddContext;
std::shared_ptr<IDDTxnProcessor> real;
@ -66,6 +68,8 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
IDDTxnProcessorApiWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), ddContext(UID()) {
enabled = !clientId && g_network->isSimulated(); // only do this on the "first" client
testDuration = getOption(options, "testDuration"_sr, 10.0);
meanDelay = getOption(options, "meanDelay"_sr, meanDelay);
maxKeyspace = getOption(options, "maxKeyspace"_sr, maxKeyspace);
}
Future<Void> setup(Database const& cx) override { return enabled ? _setup(cx, this) : Void(); }
@ -76,21 +80,15 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
// real world key-server mappings. It's not harmful to leave other workload injection enabled for now, though.
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("RandomMoveKeys"); }
ACTOR Future<Void> _setup(Database cx, IDDTxnProcessorApiWorkload* self) {
self->real = std::make_shared<DDTxnProcessor>(cx);
// Get the database configuration so as to use proper team size
wait(store(self->ddContext.configuration, self->real->getDatabaseConfiguration()));
ASSERT(self->ddContext.configuration.storageTeamSize > 0);
// FIXME: add support for generating random teams across DCs
ASSERT_EQ(self->ddContext.usableRegions(), 1);
ACTOR static Future<Void> readRealInitialDataDistribution(IDDTxnProcessorApiWorkload* self) {
loop {
wait(store(self->ddContext.lock, ::readMoveKeysLock(cx)));
wait(store(self->ddContext.lock, ::readMoveKeysLock(self->real->context())));
// read real InitialDataDistribution
try {
wait(store(self->realInitDD,
self->real->getInitialDataDistribution(
self->ddContext.id(), self->ddContext.lock, {}, self->ddContext.ddEnabledState.get())));
wait(store(
self->realInitDD,
self->real->getInitialDataDistribution(
self->ddContext.id(), self->ddContext.lock, {}, self->ddContext.ddEnabledState.get(), true)));
std::cout << "Finish read real InitialDataDistribution: server size "
<< self->realInitDD->allServers.size() << ", shard size: " << self->realInitDD->shards.size()
<< std::endl;
@ -103,9 +101,29 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
return Void();
}
ACTOR Future<Void> _start(Database cx, IDDTxnProcessorApiWorkload* self) {
KeyRange getRandomKeys() const {
double len = deterministicRandom()->random01() * this->maxKeyspace;
double pos = deterministicRandom()->random01() * (1.0 - len);
return KeyRangeRef(doubleToTestKey(pos), doubleToTestKey(pos + len));
}
ACTOR Future<Void> _setup(Database cx, IDDTxnProcessorApiWorkload* self) {
int oldMode = wait(setDDMode(cx, 0));
TraceEvent("IDDTxnApiTestStartModeSetting").detail("OldValue", oldMode).log();
self->real = std::make_shared<DDTxnProcessor>(cx);
// Get the database configuration so as to use proper team size
wait(store(self->ddContext.configuration, self->real->getDatabaseConfiguration()));
ASSERT(self->ddContext.configuration.storageTeamSize > 0);
// FIXME: add support for generating random teams across DCs
ASSERT_EQ(self->ddContext.usableRegions(), 1);
wait(readRealInitialDataDistribution(self));
return Void();
}
ACTOR Future<Void> _start(Database cx, IDDTxnProcessorApiWorkload* self) {
self->mgs = std::make_shared<MockGlobalState>();
self->mgs->configuration = self->ddContext.configuration;
self->mock = std::make_shared<DDMockTxnProcessor>(self->mgs);
@ -114,7 +132,7 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
Reference<InitialDataDistribution> mockInitData =
self->mock
->getInitialDataDistribution(
self->ddContext.id(), self->ddContext.lock, {}, self->ddContext.ddEnabledState.get())
self->ddContext.id(), self->ddContext.lock, {}, self->ddContext.ddEnabledState.get(), true)
.get();
verifyInitDataEqual(self->realInitDD, mockInitData);
@ -129,11 +147,22 @@ struct IDDTxnProcessorApiWorkload : TestWorkload {
return Void();
}
// ACTOR Future<Void> worker(Database cx, IDDTxnProcessorApiWorkload* self) { return Void(); }
ACTOR Future<Void> worker(Database cx, IDDTxnProcessorApiWorkload* self) {
state KeyRangeMap<std::vector<StorageServerInterface>> inFlight;
state KeyRangeActorMap inFlightActors;
state double lastTime = now();
loop {
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
// Keep trying to get the moveKeysLock
}
}
Future<bool> check(Database const& cx) override {
return tag(delay(testDuration / 2), true);
} // Give the database time to recover from our damage
void getMetrics(std::vector<PerfMetric>& m) override {}
};