diff --git a/fdbserver/DDTxnProcessor.actor.cpp b/fdbserver/DDTxnProcessor.actor.cpp index 78e663fceb..9690f7afda 100644 --- a/fdbserver/DDTxnProcessor.actor.cpp +++ b/fdbserver/DDTxnProcessor.actor.cpp @@ -705,12 +705,12 @@ struct DDMockTxnProcessorImpl { std::sort(params.destinationTeam.begin(), params.destinationTeam.end()); std::sort(params.healthyDestinations.begin(), params.healthyDestinations.end()); - self->rawStartMovement(params, tssMapping); + wait(self->rawStartMovement(params, tssMapping)); ASSERT(tssMapping.empty()); wait(checkFetchingState(self, params.destinationTeam, params.keys)); - self->rawFinishMovement(params, tssMapping); + wait(self->rawFinishMovement(params, tssMapping)); if (!params.dataMovementComplete.isSet()) params.dataMovementComplete.send(Void()); return Void(); @@ -888,10 +888,14 @@ Future> DDMockTxnProcessor::getWorkers() const { return Future>(); } -void DDMockTxnProcessor::rawStartMovement(MoveKeysParams& params, std::map& tssMapping) { +ACTOR Future rawStartMovement(std::shared_ptr mgs, + MoveKeysParams params, + std::map tssMapping) { // There won’t be parallel rawStart or rawFinish in mock world due to the fact the following *mock* transaction code // will always finish without coroutine switch. ASSERT(params.startMoveKeysParallelismLock->activePermits() == 0); + wait(params.startMoveKeysParallelismLock->take(TaskPriority::DataDistributionLaunch)); + state FlowLock::Releaser releaser(*params.startMoveKeysParallelismLock); std::vector destTeams; destTeams.emplace_back(params.destinationTeam, true); @@ -920,13 +924,22 @@ void DDMockTxnProcessor::rawStartMovement(MoveKeysParams& params, std::maprestrictSize); server.signalFetchKeys(params.keys, randomRangeSize); } + return Void(); } -void DDMockTxnProcessor::rawFinishMovement(MoveKeysParams& params, - const std::map& tssMapping) { +Future DDMockTxnProcessor::rawStartMovement(MoveKeysParams& params, + std::map& tssMapping) { + return ::rawStartMovement(mgs, params, tssMapping); +} + +ACTOR Future rawFinishMovement(std::shared_ptr mgs, + MoveKeysParams params, + std::map tssMapping) { // There won’t be parallel rawStart or rawFinish in mock world due to the fact the following *mock* transaction code // will always finish without coroutine switch. ASSERT(params.finishMoveKeysParallelismLock->activePermits() == 0); + wait(params.finishMoveKeysParallelismLock->take(TaskPriority::DataDistributionLaunch)); + state FlowLock::Releaser releaser(*params.finishMoveKeysParallelismLock); // get source and dest teams auto [destTeams, srcTeams] = mgs->shardMapping->getTeamsForFirstShard(params.keys); @@ -953,4 +966,10 @@ void DDMockTxnProcessor::rawFinishMovement(MoveKeysParams& params, } mgs->shardMapping->finishMove(params.keys); mgs->shardMapping->defineShard(params.keys); // coalesce for merge + return Void(); +} + +Future DDMockTxnProcessor::rawFinishMovement(MoveKeysParams& params, + const std::map& tssMapping) { + return ::rawFinishMovement(mgs, params, tssMapping); } diff --git a/fdbserver/include/fdbserver/DDTxnProcessor.h b/fdbserver/include/fdbserver/DDTxnProcessor.h index 0142c95183..09a9f48160 100644 --- a/fdbserver/include/fdbserver/DDTxnProcessor.h +++ b/fdbserver/include/fdbserver/DDTxnProcessor.h @@ -292,9 +292,9 @@ public: Future> getWorkers() const override; protected: - void rawStartMovement(MoveKeysParams& params, std::map& tssMapping); + Future rawStartMovement(MoveKeysParams& params, std::map& tssMapping); - void rawFinishMovement(MoveKeysParams& params, const std::map& tssMapping); + Future rawFinishMovement(MoveKeysParams& params, const std::map& tssMapping); }; #endif // FOUNDATIONDB_DDTXNPROCESSOR_H diff --git a/fdbserver/workloads/IDDTxnProcessorApiCorrectness.actor.cpp b/fdbserver/workloads/IDDTxnProcessorApiCorrectness.actor.cpp index aceae94ad7..bf5eccfa91 100644 --- a/fdbserver/workloads/IDDTxnProcessorApiCorrectness.actor.cpp +++ b/fdbserver/workloads/IDDTxnProcessorApiCorrectness.actor.cpp @@ -80,12 +80,12 @@ void verifyInitDataEqual(Reference real, Reference mgs = nullptr) : DDMockTxnProcessor(mgs) {} - void testRawStartMovement(MoveKeysParams& params, std::map& tssMapping) { - rawStartMovement(params, tssMapping); + Future testRawStartMovement(MoveKeysParams& params, std::map& tssMapping) { + return rawStartMovement(params, tssMapping); } - void testRawFinishMovement(MoveKeysParams& params, const std::map& tssMapping) { - rawFinishMovement(params, tssMapping); + Future testRawFinishMovement(MoveKeysParams& params, const std::map& tssMapping) { + return rawFinishMovement(params, tssMapping); } }; @@ -94,12 +94,12 @@ public: explicit DDTxnProcessorTester(Database cx) : DDTxnProcessor(cx) {} Future testRawStartMovement(MoveKeysParams& params, std::map& tssMapping) { - return this->rawStartMovement(params, tssMapping); + return rawStartMovement(params, tssMapping); } Future testRawFinishMovement(MoveKeysParams& params, const std::map& tssMapping) { - return this->rawFinishMovement(params, tssMapping); + return rawFinishMovement(params, tssMapping); } }; @@ -279,7 +279,7 @@ struct IDDTxnProcessorApiWorkload : TestWorkload { wait(store(params.lock, self->real->takeMoveKeysLock(UID()))); try { // test start - self->mock->testRawStartMovement(params, emptyTssMapping); + wait(self->mock->testRawStartMovement(params, emptyTssMapping)); wait(self->real->testRawStartMovement(params, emptyTssMapping)); // test finish or started but cancelled movement @@ -288,7 +288,7 @@ struct IDDTxnProcessorApiWorkload : TestWorkload { break; } - self->mock->testRawFinishMovement(params, emptyTssMapping); + wait(self->mock->testRawFinishMovement(params, emptyTssMapping)); wait(self->real->testRawFinishMovement(params, emptyTssMapping)); break; } catch (Error& e) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c49e58b14c..bc850f3333 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -171,6 +171,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/GetEstimatedRangeSize.toml) add_fdb_test(TEST_FILES fast/GetMappedRange.toml) add_fdb_test(TEST_FILES fast/IDDTxnProcessorRawStartMovement.toml) + add_fdb_test(TEST_FILES fast/IDDTxnProcessorMoveKeys.toml IGNORE) add_fdb_test(TEST_FILES fast/PrivateEndpoints.toml) add_fdb_test(TEST_FILES fast/ProtocolVersion.toml) add_fdb_test(TEST_FILES fast/RandomSelector.toml) diff --git a/tests/fast/IDDTxnProcessorMoveKeys.toml b/tests/fast/IDDTxnProcessorMoveKeys.toml new file mode 100644 index 0000000000..9dedc67253 --- /dev/null +++ b/tests/fast/IDDTxnProcessorMoveKeys.toml @@ -0,0 +1,13 @@ +[configuration] +generateFearless = false # prevent generating remote dc because in MGS there's no region setting yet +disableTss = true # There's no TSS in MGS this prevent the DD operate TSS mapping + +[[knobs]] +max_added_sources_multiplier = 0 # set to 0 because it's impossible to make sure SS and mock SS will finish fetch keys at the same time. + +[[test]] +testTitle = 'IDDTxnProcessorMoveKeys' + + [[test.workload]] + testName = 'IDDTxnProcessorApiCorrectness' + testDuration = 50.0 diff --git a/tests/fast/IDDTxnProcessorRawStartMovement.toml b/tests/fast/IDDTxnProcessorRawStartMovement.toml index 8bec1e456a..73109583ee 100644 --- a/tests/fast/IDDTxnProcessorRawStartMovement.toml +++ b/tests/fast/IDDTxnProcessorRawStartMovement.toml @@ -6,7 +6,7 @@ disableTss = true # There's no TSS in MGS this prevent the DD operate TSS mappin max_added_sources_multiplier = 0 # set to 0 because it's impossible to make sure SS and mock SS will finish fetch keys at the same time. [[test]] -testTitle = 'IDDTxnProcessorApiCorrectness' +testTitle = 'IDDTxnProcessorRawStartMovement' [[test.workload]] testName = 'IDDTxnProcessorApiCorrectness'