diff --git a/fdbserver/DDTxnProcessor.actor.cpp b/fdbserver/DDTxnProcessor.actor.cpp index 35575d97f0..2e1f35e2c4 100644 --- a/fdbserver/DDTxnProcessor.actor.cpp +++ b/fdbserver/DDTxnProcessor.actor.cpp @@ -240,7 +240,8 @@ class DDTxnProcessorImpl { UID distributorId, MoveKeysLock moveKeysLock, std::vector> remoteDcIds, - const DDEnabledState* ddEnabledState) { + const DDEnabledState* ddEnabledState, + bool skipDDModeCheck = false) { state Reference result = makeReference(); state Key beginKey = allKeys.begin; @@ -253,6 +254,7 @@ class DDTxnProcessorImpl { state std::vector> tss_servers; state int numDataMoves = 0; + CODE_PROBE(skipDDModeCheck, "DD Mode won't prevent read initial data distribution."); // Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure // causing entries to be duplicated loop { @@ -285,7 +287,7 @@ class DDTxnProcessorImpl { BinaryReader rd(mode.get(), Unversioned()); rd >> result->mode; } - if (!result->mode || !ddEnabledState->isDDEnabled()) { + if ((!skipDDModeCheck && !result->mode) || !ddEnabledState->isDDEnabled()) { // DD can be disabled persistently (result->mode = 0) or transiently (isDDEnabled() = 0) TraceEvent(SevDebug, "GetInitialDataDistribution_DisabledDD").log(); return result; @@ -620,8 +622,10 @@ Future> DDTxnProcessor::getInitialDataDistrib const UID& distributorId, const MoveKeysLock& moveKeysLock, const std::vector>& remoteDcIds, - const DDEnabledState* ddEnabledState) { - return DDTxnProcessorImpl::getInitialDataDistribution(cx, distributorId, moveKeysLock, remoteDcIds, ddEnabledState); + const DDEnabledState* ddEnabledState, + bool skipDDModeCheck) { + return DDTxnProcessorImpl::getInitialDataDistribution( + cx, distributorId, moveKeysLock, remoteDcIds, ddEnabledState, skipDDModeCheck); } Future DDTxnProcessor::waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const { @@ -787,7 +791,8 @@ Future> DDMockTxnProcessor::getInitialDataDis const UID& distributorId, const MoveKeysLock& moveKeysLock, const std::vector>& remoteDcIds, - const DDEnabledState* ddEnabledState) { + const DDEnabledState* ddEnabledState, + bool skipDDModeCheck) { // FIXME: now we just ignore ddEnabledState and moveKeysLock, will fix it in the future Reference res = makeReference(); @@ -847,7 +852,6 @@ void DDMockTxnProcessor::setupMockGlobalState(Reference mgs->shardMapping->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::Normal); } -// FIXME: finish moveKeys implementation Future DDMockTxnProcessor::moveKeys(const MoveKeysParams& params) { // Not support location metadata yet ASSERT(!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA); @@ -886,6 +890,7 @@ Future> DDMockTxnProcessor::getWorkers() const { Future DDMockTxnProcessor::rawStartMovement(MoveKeysParams& params, std::map& tssMapping) { + FlowLock::Releaser releaser(*params.startMoveKeysParallelismLock); std::vector destTeams; destTeams.emplace_back(params.destinationTeam, true); mgs->shardMapping->moveShard(params.keys, destTeams); @@ -898,6 +903,8 @@ Future DDMockTxnProcessor::rawStartMovement(MoveKeysParams& params, Future DDMockTxnProcessor::rawFinishMovement(MoveKeysParams& params, const std::map& tssMapping) { + FlowLock::Releaser releaser(*params.finishMoveKeysParallelismLock); + // get source and dest teams auto [destTeams, srcTeams] = mgs->shardMapping->getTeamsFor(params.keys); diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index d6f8fb62cc..69da972919 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -316,7 +316,8 @@ public: ddId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector>(), - context->ddEnabledState.get())); + context->ddEnabledState.get(), + false)); } void initDcInfo() { diff --git a/fdbserver/MockGlobalState.cpp b/fdbserver/MockGlobalState.cpp index 830ccb2170..7be726439f 100644 --- a/fdbserver/MockGlobalState.cpp +++ b/fdbserver/MockGlobalState.cpp @@ -76,7 +76,7 @@ void MockStorageServer::threeWayShardSplitting(KeyRangeRef outerRange, outerRangeSize - leftSize - SERVER_KNOBS->MIN_SHARD_BYTES + 1); int rightSize = outerRangeSize - leftSize - midSize; - serverKeys.insert(innerRange, { MockShardStatus::UNSET, (uint64_t)midSize }); + serverKeys.insert(innerRange, { serverKeys[left].status, (uint64_t)midSize }); serverKeys[left].shardSize = leftSize; serverKeys[innerRange.end].shardSize = rightSize; } @@ -89,7 +89,7 @@ void MockStorageServer::twoWayShardSplitting(KeyRangeRef range, KeyRef splitPoin int leftSize = deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, rangeSize - SERVER_KNOBS->MIN_SHARD_BYTES + 1); int rightSize = rangeSize - leftSize; - serverKeys.rawInsert(splitPoint, { MockShardStatus::UNSET, (uint64_t)rightSize }); + serverKeys.rawInsert(splitPoint, { serverKeys[left].status, (uint64_t)rightSize }); serverKeys[left].shardSize = leftSize; } @@ -102,7 +102,7 @@ void MockStorageServer::removeShard(KeyRangeRef range) { uint64_t MockStorageServer::sumRangeSize(KeyRangeRef range) const { auto ranges = serverKeys.intersectingRanges(range); uint64_t totalSize = 0; - for(auto it = ranges.begin(); it != ranges.end(); ++ it) { + for (auto it = ranges.begin(); it != ranges.end(); ++it) { totalSize += it->cvalue().shardSize; } return totalSize; @@ -194,6 +194,7 @@ struct MockGlobalStateTester { auto it = mss.serverKeys.ranges().begin(); uint64_t oldSize = deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, std::numeric_limits::max()); + MockShardStatus oldStatus = it.cvalue().status; it->value().shardSize = oldSize; KeyRangeRef outerRange = it->range(); Key x1 = keyAfter(it->range().begin); @@ -205,6 +206,7 @@ struct MockGlobalStateTester { ASSERT(ranges.begin().range() == KeyRangeRef(outerRange.begin, x1)); ranges.pop_front(); ASSERT(ranges.begin().range() == KeyRangeRef(x1, x2)); + ASSERT(ranges.begin().cvalue().status == oldStatus); ranges.pop_front(); ASSERT(ranges.begin().range() == KeyRangeRef(x2, outerRange.end)); ranges.pop_front(); @@ -214,6 +216,7 @@ struct MockGlobalStateTester { // expectation [r0.begin, r0.end) => [r0.begin, x1), [x1, r0.end) void testTwoWaySplitFirstRange(MockStorageServer& mss) { auto it = mss.serverKeys.nthRange(0); + MockShardStatus oldStatus = it.cvalue().status; uint64_t oldSize = deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, std::numeric_limits::max()); it->value().shardSize = oldSize; @@ -226,6 +229,7 @@ struct MockGlobalStateTester { ASSERT(ranges.begin().range() == KeyRangeRef(outerRange.begin, x1)); ranges.pop_front(); ASSERT(ranges.begin().range() == KeyRangeRef(x1, outerRange.end)); + ASSERT(ranges.begin().cvalue().status == oldStatus); ranges.pop_front(); ASSERT(ranges.empty()); } @@ -247,7 +251,7 @@ TEST_CASE("/MockGlobalState/MockStorageServer/SplittingFunctions") { std::cout << "Test 3-way splitting...\n"; tester.testThreeWaySplitFirstRange(mss); std::cout << "Test 2-way splitting...\n"; - mss.serverKeys.insert(allKeys, {MockShardStatus::COMPLETED, 0}); // reset to empty + mss.serverKeys.insert(allKeys, { MockShardStatus::COMPLETED, 0 }); // reset to empty tester.testTwoWaySplitFirstRange(mss); return Void(); diff --git a/fdbserver/include/fdbserver/DDTxnProcessor.h b/fdbserver/include/fdbserver/DDTxnProcessor.h index 7b58a8bcb6..a1ffb0ec81 100644 --- a/fdbserver/include/fdbserver/DDTxnProcessor.h +++ b/fdbserver/include/fdbserver/DDTxnProcessor.h @@ -70,7 +70,8 @@ public: const UID& distributorId, const MoveKeysLock& moveKeysLock, const std::vector>& remoteDcIds, - const DDEnabledState* ddEnabledState) = 0; + const DDEnabledState* ddEnabledState, + bool skipDDModeCheck) = 0; virtual ~IDDTxnProcessor() = default; @@ -140,7 +141,8 @@ public: virtual Future> getWorkers() const = 0; protected: - virtual Future rawStartMovement(MoveKeysParams& params, std::map& tssMapping) = 0; + virtual Future rawStartMovement(MoveKeysParams& params, + std::map& tssMapping) = 0; virtual Future rawFinishMovement(MoveKeysParams& params, const std::map& tssMapping) = 0; @@ -168,11 +170,11 @@ public: // Call NativeAPI implementation directly Future getServerListAndProcessClasses() override; - Future> getInitialDataDistribution( - const UID& distributorId, - const MoveKeysLock& moveKeysLock, - const std::vector>& remoteDcIds, - const DDEnabledState* ddEnabledState) override; + Future> getInitialDataDistribution(const UID& distributorId, + const MoveKeysLock& moveKeysLock, + const std::vector>& remoteDcIds, + const DDEnabledState* ddEnabledState, + bool skipDDModeCheck) override; Future takeMoveKeysLock(UID const& ddId) const override; @@ -253,11 +255,11 @@ public: Future getServerListAndProcessClasses() override; - Future> getInitialDataDistribution( - const UID& distributorId, - const MoveKeysLock& moveKeysLock, - const std::vector>& remoteDcIds, - const DDEnabledState* ddEnabledState) override; + Future> getInitialDataDistribution(const UID& distributorId, + const MoveKeysLock& moveKeysLock, + const std::vector>& remoteDcIds, + const DDEnabledState* ddEnabledState, + bool skipDDModeCheck) override; Future removeKeysFromFailedServer(const UID& serverID, const std::vector& teamForDroppedRange, diff --git a/fdbserver/include/fdbserver/MockGlobalState.h b/fdbserver/include/fdbserver/MockGlobalState.h index c74f06ad33..1a1af7355b 100644 --- a/fdbserver/include/fdbserver/MockGlobalState.h +++ b/fdbserver/include/fdbserver/MockGlobalState.h @@ -30,10 +30,11 @@ struct MockGlobalStateTester; -enum class MockShardStatus { UNSET = -1, EMPTY = 0, COMPLETED, INFLIGHT }; +enum class MockShardStatus { EMPTY = 0, COMPLETED, INFLIGHT, UNSET }; class MockStorageServer { friend struct MockGlobalStateTester; + public: struct ShardInfo { MockShardStatus status; diff --git a/fdbserver/workloads/IDDTxnProcessorApiCorrectness.actor.cpp b/fdbserver/workloads/IDDTxnProcessorApiCorrectness.actor.cpp index 7233a125ca..9b0b8a96fb 100644 --- a/fdbserver/workloads/IDDTxnProcessorApiCorrectness.actor.cpp +++ b/fdbserver/workloads/IDDTxnProcessorApiCorrectness.actor.cpp @@ -55,6 +55,8 @@ struct IDDTxnProcessorApiWorkload : TestWorkload { static constexpr auto NAME = "IDDTxnProcessorApiCorrectness"; bool enabled; double testDuration; + double meanDelay = 0.05; + double maxKeyspace = 0.1; DDSharedContext ddContext; std::shared_ptr real; @@ -66,6 +68,8 @@ struct IDDTxnProcessorApiWorkload : TestWorkload { IDDTxnProcessorApiWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), ddContext(UID()) { enabled = !clientId && g_network->isSimulated(); // only do this on the "first" client testDuration = getOption(options, "testDuration"_sr, 10.0); + meanDelay = getOption(options, "meanDelay"_sr, meanDelay); + maxKeyspace = getOption(options, "maxKeyspace"_sr, maxKeyspace); } Future setup(Database const& cx) override { return enabled ? _setup(cx, this) : Void(); } @@ -76,21 +80,15 @@ struct IDDTxnProcessorApiWorkload : TestWorkload { // real world key-server mappings. It's not harmful to leave other workload injection enabled for now, though. void disableFailureInjectionWorkloads(std::set& out) const override { out.insert("RandomMoveKeys"); } - ACTOR Future _setup(Database cx, IDDTxnProcessorApiWorkload* self) { - self->real = std::make_shared(cx); - // Get the database configuration so as to use proper team size - wait(store(self->ddContext.configuration, self->real->getDatabaseConfiguration())); - ASSERT(self->ddContext.configuration.storageTeamSize > 0); - // FIXME: add support for generating random teams across DCs - ASSERT_EQ(self->ddContext.usableRegions(), 1); - + ACTOR static Future readRealInitialDataDistribution(IDDTxnProcessorApiWorkload* self) { loop { - wait(store(self->ddContext.lock, ::readMoveKeysLock(cx))); + wait(store(self->ddContext.lock, ::readMoveKeysLock(self->real->context()))); // read real InitialDataDistribution try { - wait(store(self->realInitDD, - self->real->getInitialDataDistribution( - self->ddContext.id(), self->ddContext.lock, {}, self->ddContext.ddEnabledState.get()))); + wait(store( + self->realInitDD, + self->real->getInitialDataDistribution( + self->ddContext.id(), self->ddContext.lock, {}, self->ddContext.ddEnabledState.get(), true))); std::cout << "Finish read real InitialDataDistribution: server size " << self->realInitDD->allServers.size() << ", shard size: " << self->realInitDD->shards.size() << std::endl; @@ -103,9 +101,29 @@ struct IDDTxnProcessorApiWorkload : TestWorkload { return Void(); } - ACTOR Future _start(Database cx, IDDTxnProcessorApiWorkload* self) { + KeyRange getRandomKeys() const { + double len = deterministicRandom()->random01() * this->maxKeyspace; + double pos = deterministicRandom()->random01() * (1.0 - len); + return KeyRangeRef(doubleToTestKey(pos), doubleToTestKey(pos + len)); + } + + ACTOR Future _setup(Database cx, IDDTxnProcessorApiWorkload* self) { int oldMode = wait(setDDMode(cx, 0)); TraceEvent("IDDTxnApiTestStartModeSetting").detail("OldValue", oldMode).log(); + + self->real = std::make_shared(cx); + // Get the database configuration so as to use proper team size + wait(store(self->ddContext.configuration, self->real->getDatabaseConfiguration())); + ASSERT(self->ddContext.configuration.storageTeamSize > 0); + // FIXME: add support for generating random teams across DCs + ASSERT_EQ(self->ddContext.usableRegions(), 1); + wait(readRealInitialDataDistribution(self)); + + return Void(); + } + + ACTOR Future _start(Database cx, IDDTxnProcessorApiWorkload* self) { + self->mgs = std::make_shared(); self->mgs->configuration = self->ddContext.configuration; self->mock = std::make_shared(self->mgs); @@ -114,7 +132,7 @@ struct IDDTxnProcessorApiWorkload : TestWorkload { Reference mockInitData = self->mock ->getInitialDataDistribution( - self->ddContext.id(), self->ddContext.lock, {}, self->ddContext.ddEnabledState.get()) + self->ddContext.id(), self->ddContext.lock, {}, self->ddContext.ddEnabledState.get(), true) .get(); verifyInitDataEqual(self->realInitDD, mockInitData); @@ -129,11 +147,22 @@ struct IDDTxnProcessorApiWorkload : TestWorkload { return Void(); } - // ACTOR Future worker(Database cx, IDDTxnProcessorApiWorkload* self) { return Void(); } + ACTOR Future worker(Database cx, IDDTxnProcessorApiWorkload* self) { + state KeyRangeMap> inFlight; + state KeyRangeActorMap inFlightActors; + state double lastTime = now(); + + loop { + + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + // Keep trying to get the moveKeysLock + } + } Future check(Database const& cx) override { return tag(delay(testDuration / 2), true); } // Give the database time to recover from our damage + void getMetrics(std::vector& m) override {} };