diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt
index 34ac56673b..70e42357f3 100644
--- a/fdbserver/CMakeLists.txt
+++ b/fdbserver/CMakeLists.txt
@@ -38,6 +38,7 @@ set(FDBSERVER_SRCS
   DBCoreState.h
   DDTeamCollection.actor.cpp
   DDTeamCollection.h
+  DDTxnProcessor.actor.cpp
   DiskQueue.actor.cpp
   EncryptKeyProxy.actor.cpp
   EncryptKeyProxyInterface.h
diff --git a/fdbserver/DDTxnProcessor.actor.cpp b/fdbserver/DDTxnProcessor.actor.cpp
new file mode 100644
index 0000000000..41dedb412f
--- /dev/null
+++ b/fdbserver/DDTxnProcessor.actor.cpp
@@ -0,0 +1,94 @@
+/*
+ * DDTxnProcessor.actor.cpp
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fdbserver/DDTxnProcessor.h"
+#include "flow/actorcompiler.h" // This must be the last #include.
+
+class DDTxnProcessorImpl {
+	friend class DDTxnProcessor;
+
+	// return {sourceServers, completeSources}
+	ACTOR static Future<IDDTxnProcessor::SourceServers> getSourceServersForRange(Database cx, KeyRangeRef keys) {
+		state std::set<UID> servers;
+		state std::vector<UID> completeSources;
+		state Transaction tr(cx);
+
+		loop {
+			servers.clear();
+			completeSources.clear();
+
+			tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
+			tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
+			try {
+				state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
+				ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
+				RangeResult keyServersEntries = wait(tr.getRange(lastLessOrEqual(keyServersKey(keys.begin)),
+				                                                 firstGreaterOrEqual(keyServersKey(keys.end)),
+				                                                 SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS));
+
+				if (keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
+					for (int shard = 0; shard < keyServersEntries.size(); shard++) {
+						std::vector<UID> src, dest;
+						decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest);
+						ASSERT(src.size());
+						for (int i = 0; i < src.size(); i++) {
+							servers.insert(src[i]);
+						}
+						if (shard == 0) {
+							completeSources = src;
+						} else {
+							for (int i = 0; i < completeSources.size(); i++) {
+								if (std::find(src.begin(), src.end(), completeSources[i]) == src.end()) {
+									swapAndPop(&completeSources, i--);
+								}
+							}
+						}
+					}
+
+					ASSERT(servers.size() > 0);
+				}
+
+				// If the size of keyServerEntries is large, then just assume we are using all storage servers
+				// Why the size can be large?
+				// When a shard is inflight and DD crashes, some destination servers may have already got the data.
+				// The new DD will treat the destination servers as source servers. So the size can be large.
+				else {
+					RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
+					ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
+
+					for (auto s = serverList.begin(); s != serverList.end(); ++s)
+						servers.insert(decodeServerListValue(s->value).id());
+
+					ASSERT(servers.size() > 0);
+				}
+
+				break;
+			} catch (Error& e) {
+				wait(tr.onError(e));
+			}
+		}
+
+		return IDDTxnProcessor::SourceServers{ std::vector<UID>(servers.begin(), servers.end()), completeSources };
+	}
+};
+
+Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
+	return DDTxnProcessorImpl::getSourceServersForRange(cx, range);
+}
\ No newline at end of file
diff --git a/fdbserver/DDTxnProcessor.h b/fdbserver/DDTxnProcessor.h
new file mode 100644
index 0000000000..a819fdead6
--- /dev/null
+++ b/fdbserver/DDTxnProcessor.h
@@ -0,0 +1,54 @@
+/*
+ * DDTxnProcessor.h
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOUNDATIONDB_DDTXNPROCESSOR_H
+#define FOUNDATIONDB_DDTXNPROCESSOR_H
+
+#include "fdbserver/Knobs.h"
+#include "fdbserver/DataDistribution.actor.h"
+
+class IDDTxnProcessor {
+public:
+	struct SourceServers {
+		std::vector<UID> srcServers, completeSources; // the same as RelocateData.src, RelocateData.completeSources;
+	};
+	// get the source server list and complete source server list for range
+	virtual Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) = 0;
+	virtual ~IDDTxnProcessor() = default;
+};
+
+class DDTxnProcessorImpl;
+
+// run transactions over real database
+class DDTxnProcessor : public IDDTxnProcessor {
+	friend class DDTxnProcessorImpl;
+	Database cx;
+
+public:
+	DDTxnProcessor() = default;
+	explicit DDTxnProcessor(Database cx) : cx(cx) {}
+
+	Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) override;
+};
+
+// run mock transaction
+class DDMockTxnProcessor : public IDDTxnProcessor {};
+
+#endif // FOUNDATIONDB_DDTXNPROCESSOR_H
diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp
index 10d5c90357..da783eddc8 100644
--- a/fdbserver/DataDistributionQueue.actor.cpp
+++ b/fdbserver/DataDistributionQueue.actor.cpp
@@ -33,6 +33,7 @@
 #include "fdbserver/MoveKeys.actor.h"
 #include "fdbserver/Knobs.h"
 #include "fdbrpc/simulator.h"
+#include "fdbserver/DDTxnProcessor.h"
 #include "flow/actorcompiler.h" // This must be the last #include.
 
 #define WORK_FULL_UTILIZATION 10000 // This is not a knob; it is a fixed point scaling factor!
@@ -445,6 +446,7 @@ struct DDQueueData {
 	UID distributorId;
 	MoveKeysLock lock;
 	Database cx;
+	std::shared_ptr<IDDTxnProcessor> txnProcessor;
 
 	std::vector<TeamCollectionInterface> teamCollections;
 	Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
@@ -550,8 +552,8 @@ struct DDQueueData {
 	            FutureStream<RelocateShard> input,
 	            PromiseStream<GetMetricsRequest> getShardMetrics,
 	            PromiseStream<GetTopKMetricsRequest> getTopKMetrics)
-	  : distributorId(mid), lock(lock), cx(cx), teamCollections(teamCollections), shardsAffectedByTeamFailure(sABTF),
-	    getAverageShardBytes(getAverageShardBytes),
+	  : distributorId(mid), lock(lock), cx(cx), txnProcessor(new DDTxnProcessor(cx)), teamCollections(teamCollections),
+	    shardsAffectedByTeamFailure(sABTF), getAverageShardBytes(getAverageShardBytes),
 	    startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
 	    finishMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
 	    fetchSourceLock(new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM)), activeRelocations(0),
@@ -705,12 +707,11 @@ struct DDQueueData {
 		}
 	}
 
-	ACTOR Future<Void> getSourceServersForRange(Database cx,
-	                                            RelocateData input,
-	                                            PromiseStream<RelocateData> output,
-	                                            Reference<FlowLock> fetchLock) {
-		state std::set<UID> servers;
-		state Transaction tr(cx);
+	ACTOR static Future<Void> getSourceServersForRange(DDQueueData* self,
+	                                                   Database cx,
+	                                                   RelocateData input,
+	                                                   PromiseStream<RelocateData> output,
+	                                                   Reference<FlowLock> fetchLock) {
 
 		// FIXME: is the merge case needed
 		if (input.priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD) {
@@ -722,59 +723,9 @@ struct DDQueueData {
 		wait(fetchLock->take(TaskPriority::DataDistributionLaunch));
 		state FlowLock::Releaser releaser(*fetchLock);
 
-		loop {
-			servers.clear();
-			tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
-			try {
-				state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
-				ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
-				RangeResult keyServersEntries = wait(tr.getRange(lastLessOrEqual(keyServersKey(input.keys.begin)),
-				                                                 firstGreaterOrEqual(keyServersKey(input.keys.end)),
-				                                                 SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS));
-
-				if (keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
-					for (int shard = 0; shard < keyServersEntries.size(); shard++) {
-						std::vector<UID> src, dest;
-						decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest);
-						ASSERT(src.size());
-						for (int i = 0; i < src.size(); i++) {
-							servers.insert(src[i]);
-						}
-						if (shard == 0) {
-							input.completeSources = src;
-						} else {
-							for (int i = 0; i < input.completeSources.size(); i++) {
-								if (std::find(src.begin(), src.end(), input.completeSources[i]) == src.end()) {
-									swapAndPop(&input.completeSources, i--);
-								}
-							}
-						}
-					}
-
-					ASSERT(servers.size() > 0);
-				}
-
-				// If the size of keyServerEntries is large, then just assume we are using all storage servers
-				// Why the size can be large?
-				// When a shard is inflight and DD crashes, some destination servers may have already got the data.
-				// The new DD will treat the destination servers as source servers. So the size can be large.
-				else {
-					RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
-					ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
-
-					for (auto s = serverList.begin(); s != serverList.end(); ++s)
-						servers.insert(decodeServerListValue(s->value).id());
-
-					ASSERT(servers.size() > 0);
-				}
-
-				break;
-			} catch (Error& e) {
-				wait(tr.onError(e));
-			}
-		}
-
-		input.src = std::vector<UID>(servers.begin(), servers.end());
+		IDDTxnProcessor::SourceServers res = wait(self->txnProcessor->getSourceServersForRange(input.keys));
+		input.src = std::move(res.srcServers);
+		input.completeSources = std::move(res.completeSources);
 		output.send(input);
 		return Void();
 	}
@@ -864,8 +815,8 @@ struct DDQueueData {
 				startRelocation(rrs.priority, rrs.healthPriority);
 
 				fetchingSourcesQueue.insert(rrs);
-				getSourceActors.insert(rrs.keys,
-				                       getSourceServersForRange(cx, rrs, fetchSourceServersComplete, fetchSourceLock));
+				getSourceActors.insert(
+				    rrs.keys, getSourceServersForRange(this, cx, rrs, fetchSourceServersComplete, fetchSourceLock));
 			} else {
 				RelocateData newData(rrs);
 				newData.keys = affectedQueuedItems[r];
diff --git a/flow/genericactors.actor.cpp b/flow/genericactors.actor.cpp
index 6bb1e3fd8d..523e5d8027 100644
--- a/flow/genericactors.actor.cpp
+++ b/flow/genericactors.actor.cpp
@@ -180,3 +180,22 @@ TEST_CASE("/flow/genericactors/AsyncListener") {
 	ASSERT(!subscriber2.isReady());
 	return Void();
 }
+
+#if false
+TEST_CASE("/flow/genericactors/generic/storeTuple") {
+	state std::vector<UID> resA;
+	state int resB;
+	state double resC;
+
+	state Promise<std::tuple<std::vector<UID>, int, double>> promise;
+
+	auto future = storeTuple(promise.getFuture(), resA, resB, resC);
+
+	promise.send(std::make_tuple(std::vector<UID>(10), 15, 2.0));
+	wait(ready(future));
+	ASSERT(resA.size() == 10);
+	ASSERT(resB == 15);
+	ASSERT(resC == 2.0);
+	return Void();
+}
+#endif
diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h
index dbbf98f07a..4955fb6212 100644
--- a/flow/genericactors.actor.h
+++ b/flow/genericactors.actor.h
@@ -300,6 +300,19 @@ Future<Void> store(T& out, Future<T> what) {
 	});
 }
 
+#if false
+// NOTE: Think twice whether create a new struct for a complex return type is better before using tuple.
+// If we just use the return type once, is it worth to create a new struct?
+// And enable the unit test in genericactors.actor.cpp
+template <class A, class... Bs>
+Future<Void> storeTuple(Future<std::tuple<A, Bs...>> what, A& a, Bs&... b) {
+	return map(what, [&](std::tuple<A, Bs...> const& v) {
+		std::tie(a, b...) = v;
+		return Void();
+	});
+}
+#endif
+
 template <class T>
 Future<Void> storeOrThrow(T& out, Future<Optional<T>> what, Error e = key_not_found()) {
 	return map(what, [&out, e](Optional<T> const& o) {