Merge pull request #7352 from sfc-gh-xwang/feature/ddtxn

[DD testability enhancement] Create IDDTxnProcessor and simple refactoring
This commit is contained in:
Trevor Clinkenbeard 2022-06-13 16:01:13 -07:00 committed by GitHub
commit 6bed046148
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 195 additions and 63 deletions

View File

@ -38,6 +38,7 @@ set(FDBSERVER_SRCS
DBCoreState.h
DDTeamCollection.actor.cpp
DDTeamCollection.h
DDTxnProcessor.actor.cpp
DiskQueue.actor.cpp
EncryptKeyProxy.actor.cpp
EncryptKeyProxyInterface.h

View File

@ -0,0 +1,94 @@
/*
* DDTxnProcessor.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/DDTxnProcessor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class DDTxnProcessorImpl {
friend class DDTxnProcessor;
// return {sourceServers, completeSources}
ACTOR static Future<IDDTxnProcessor::SourceServers> getSourceServersForRange(Database cx, KeyRangeRef keys) {
state std::set<UID> servers;
state std::vector<UID> completeSources;
state Transaction tr(cx);
loop {
servers.clear();
completeSources.clear();
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
try {
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
RangeResult keyServersEntries = wait(tr.getRange(lastLessOrEqual(keyServersKey(keys.begin)),
firstGreaterOrEqual(keyServersKey(keys.end)),
SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS));
if (keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
for (int shard = 0; shard < keyServersEntries.size(); shard++) {
std::vector<UID> src, dest;
decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest);
ASSERT(src.size());
for (int i = 0; i < src.size(); i++) {
servers.insert(src[i]);
}
if (shard == 0) {
completeSources = src;
} else {
for (int i = 0; i < completeSources.size(); i++) {
if (std::find(src.begin(), src.end(), completeSources[i]) == src.end()) {
swapAndPop(&completeSources, i--);
}
}
}
}
ASSERT(servers.size() > 0);
}
// If the size of keyServerEntries is large, then just assume we are using all storage servers
// Why the size can be large?
// When a shard is inflight and DD crashes, some destination servers may have already got the data.
// The new DD will treat the destination servers as source servers. So the size can be large.
else {
RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
for (auto s = serverList.begin(); s != serverList.end(); ++s)
servers.insert(decodeServerListValue(s->value).id());
ASSERT(servers.size() > 0);
}
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return IDDTxnProcessor::SourceServers{ std::vector<UID>(servers.begin(), servers.end()), completeSources };
}
};
Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
return DDTxnProcessorImpl::getSourceServersForRange(cx, range);
}

View File

@ -0,0 +1,54 @@
/*
* DDTxnProcessor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOUNDATIONDB_DDTXNPROCESSOR_H
#define FOUNDATIONDB_DDTXNPROCESSOR_H
#include "fdbserver/Knobs.h"
#include "fdbserver/DataDistribution.actor.h"
class IDDTxnProcessor {
public:
struct SourceServers {
std::vector<UID> srcServers, completeSources; // the same as RelocateData.src, RelocateData.completeSources;
};
// get the source server list and complete source server list for range
virtual Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) = 0;
virtual ~IDDTxnProcessor() = default;
};
class DDTxnProcessorImpl;
// run transactions over real database
class DDTxnProcessor : public IDDTxnProcessor {
friend class DDTxnProcessorImpl;
Database cx;
public:
DDTxnProcessor() = default;
explicit DDTxnProcessor(Database cx) : cx(cx) {}
Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) override;
};
// run mock transaction
class DDMockTxnProcessor : public IDDTxnProcessor {};
#endif // FOUNDATIONDB_DDTXNPROCESSOR_H

View File

@ -33,6 +33,7 @@
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/DDTxnProcessor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#define WORK_FULL_UTILIZATION 10000 // This is not a knob; it is a fixed point scaling factor!
@ -445,6 +446,7 @@ struct DDQueueData {
UID distributorId;
MoveKeysLock lock;
Database cx;
std::shared_ptr<IDDTxnProcessor> txnProcessor;
std::vector<TeamCollectionInterface> teamCollections;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
@ -550,8 +552,8 @@ struct DDQueueData {
FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetTopKMetricsRequest> getTopKMetrics)
: distributorId(mid), lock(lock), cx(cx), teamCollections(teamCollections), shardsAffectedByTeamFailure(sABTF),
getAverageShardBytes(getAverageShardBytes),
: distributorId(mid), lock(lock), cx(cx), txnProcessor(new DDTxnProcessor(cx)), teamCollections(teamCollections),
shardsAffectedByTeamFailure(sABTF), getAverageShardBytes(getAverageShardBytes),
startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
finishMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
fetchSourceLock(new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM)), activeRelocations(0),
@ -705,12 +707,11 @@ struct DDQueueData {
}
}
ACTOR Future<Void> getSourceServersForRange(Database cx,
RelocateData input,
PromiseStream<RelocateData> output,
Reference<FlowLock> fetchLock) {
state std::set<UID> servers;
state Transaction tr(cx);
ACTOR static Future<Void> getSourceServersForRange(DDQueueData* self,
Database cx,
RelocateData input,
PromiseStream<RelocateData> output,
Reference<FlowLock> fetchLock) {
// FIXME: is the merge case needed
if (input.priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD) {
@ -722,59 +723,9 @@ struct DDQueueData {
wait(fetchLock->take(TaskPriority::DataDistributionLaunch));
state FlowLock::Releaser releaser(*fetchLock);
loop {
servers.clear();
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
try {
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
RangeResult keyServersEntries = wait(tr.getRange(lastLessOrEqual(keyServersKey(input.keys.begin)),
firstGreaterOrEqual(keyServersKey(input.keys.end)),
SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS));
if (keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
for (int shard = 0; shard < keyServersEntries.size(); shard++) {
std::vector<UID> src, dest;
decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest);
ASSERT(src.size());
for (int i = 0; i < src.size(); i++) {
servers.insert(src[i]);
}
if (shard == 0) {
input.completeSources = src;
} else {
for (int i = 0; i < input.completeSources.size(); i++) {
if (std::find(src.begin(), src.end(), input.completeSources[i]) == src.end()) {
swapAndPop(&input.completeSources, i--);
}
}
}
}
ASSERT(servers.size() > 0);
}
// If the size of keyServerEntries is large, then just assume we are using all storage servers
// Why the size can be large?
// When a shard is inflight and DD crashes, some destination servers may have already got the data.
// The new DD will treat the destination servers as source servers. So the size can be large.
else {
RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
for (auto s = serverList.begin(); s != serverList.end(); ++s)
servers.insert(decodeServerListValue(s->value).id());
ASSERT(servers.size() > 0);
}
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
input.src = std::vector<UID>(servers.begin(), servers.end());
IDDTxnProcessor::SourceServers res = wait(self->txnProcessor->getSourceServersForRange(input.keys));
input.src = std::move(res.srcServers);
input.completeSources = std::move(res.completeSources);
output.send(input);
return Void();
}
@ -864,8 +815,8 @@ struct DDQueueData {
startRelocation(rrs.priority, rrs.healthPriority);
fetchingSourcesQueue.insert(rrs);
getSourceActors.insert(rrs.keys,
getSourceServersForRange(cx, rrs, fetchSourceServersComplete, fetchSourceLock));
getSourceActors.insert(
rrs.keys, getSourceServersForRange(this, cx, rrs, fetchSourceServersComplete, fetchSourceLock));
} else {
RelocateData newData(rrs);
newData.keys = affectedQueuedItems[r];

View File

@ -180,3 +180,22 @@ TEST_CASE("/flow/genericactors/AsyncListener") {
ASSERT(!subscriber2.isReady());
return Void();
}
#if false
TEST_CASE("/flow/genericactors/generic/storeTuple") {
state std::vector<UID> resA;
state int resB;
state double resC;
state Promise<std::tuple<std::vector<UID>, int, double>> promise;
auto future = storeTuple(promise.getFuture(), resA, resB, resC);
promise.send(std::make_tuple(std::vector<UID>(10), 15, 2.0));
wait(ready(future));
ASSERT(resA.size() == 10);
ASSERT(resB == 15);
ASSERT(resC == 2.0);
return Void();
}
#endif

View File

@ -300,6 +300,19 @@ Future<Void> store(T& out, Future<T> what) {
});
}
#if false
// NOTE: Think twice whether create a new struct for a complex return type is better before using tuple.
// If we just use the return type once, is it worth to create a new struct?
// And enable the unit test in genericactors.actor.cpp
template <class A, class... Bs>
Future<Void> storeTuple(Future<std::tuple<A, Bs...>> what, A& a, Bs&... b) {
return map(what, [&](std::tuple<A, Bs...> const& v) {
std::tie(a, b...) = v;
return Void();
});
}
#endif
template <class T>
Future<Void> storeOrThrow(T& out, Future<Optional<T>> what, Error e = key_not_found()) {
return map(what, [&out, e](Optional<T> const& o) {