add storeTuple and unit test; refactor getSourceServersForRange
This commit is contained in:
parent
21e7e6d2ba
commit
6ab12ea971
|
@ -22,7 +22,10 @@
|
|||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
class DDTxnProcessorImpl {
|
||||
ACTOR static Future<std::pair<std::vector<UID>, std::vector<UID>>> getSourceServersForRange(
|
||||
friend class DDTxnProcessor;
|
||||
|
||||
// return {sourceServers, completeSources}
|
||||
ACTOR static Future<std::tuple<std::vector<UID>, std::vector<UID>>> getSourceServersForRange(
|
||||
Database cx,
|
||||
const KeyRangeRef keys) {
|
||||
state std::set<UID> servers;
|
||||
|
@ -31,6 +34,8 @@ class DDTxnProcessorImpl {
|
|||
|
||||
loop {
|
||||
servers.clear();
|
||||
completeSources.clear();
|
||||
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
try {
|
||||
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
|
@ -80,5 +85,12 @@ class DDTxnProcessorImpl {
|
|||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_tuple(std::vector<UID>(servers.begin(), servers.end()), completeSources);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
Future<std::tuple<std::vector<UID>, std::vector<UID>>> DDTxnProcessor::getSourceServersForRange(
|
||||
const KeyRangeRef range) {
|
||||
return DDTxnProcessorImpl::getSourceServersForRange(cx, range);
|
||||
}
|
|
@ -21,17 +21,30 @@
|
|||
#ifndef FOUNDATIONDB_DDTXNPROCESSOR_H
|
||||
#define FOUNDATIONDB_DDTXNPROCESSOR_H
|
||||
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
|
||||
class IDDTxnProcessor {
|
||||
public:
|
||||
// get the source server and complete source server for range
|
||||
virtual Future<std::pair<std::vector<UID>, std::vector<UID>>> getSourceServersForRange(const KeyRangeRef range) = 0;
|
||||
virtual Future<std::tuple<std::vector<UID>, std::vector<UID>>> getSourceServersForRange(
|
||||
const KeyRangeRef range) = 0;
|
||||
virtual ~IDDTxnProcessor() = default;
|
||||
};
|
||||
|
||||
class DDTxnProcessorImpl;
|
||||
|
||||
// run transactions over real database
|
||||
class DDTxnProcessor : public IDDTxnProcessor {};
|
||||
class DDTxnProcessor : public IDDTxnProcessor {
|
||||
friend class DDTxnProcessorImpl;
|
||||
Database cx;
|
||||
|
||||
public:
|
||||
DDTxnProcessor() = default;
|
||||
explicit DDTxnProcessor(Database cx) : cx(cx) {}
|
||||
|
||||
Future<std::tuple<std::vector<UID>, std::vector<UID>>> getSourceServersForRange(const KeyRangeRef range) override;
|
||||
};
|
||||
|
||||
// run mock transaction
|
||||
class DDMockTxnProcessor : public IDDTxnProcessor {};
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/DDTxnProcessor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
#define WORK_FULL_UTILIZATION 10000 // This is not a knob; it is a fixed point scaling factor!
|
||||
|
@ -445,6 +446,7 @@ struct DDQueueData {
|
|||
UID distributorId;
|
||||
MoveKeysLock lock;
|
||||
Database cx;
|
||||
std::unique_ptr<IDDTxnProcessor> txnProcessor;
|
||||
|
||||
std::vector<TeamCollectionInterface> teamCollections;
|
||||
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
|
||||
|
@ -552,8 +554,8 @@ struct DDQueueData {
|
|||
PromiseStream<GetMetricsRequest> getShardMetrics,
|
||||
PromiseStream<GetTopKMetricsRequest> getTopKMetrics,
|
||||
double* lastLimited)
|
||||
: distributorId(mid), lock(lock), cx(cx), teamCollections(teamCollections), shardsAffectedByTeamFailure(sABTF),
|
||||
getAverageShardBytes(getAverageShardBytes),
|
||||
: distributorId(mid), lock(lock), cx(cx), txnProcessor(new DDTxnProcessor(cx)), teamCollections(teamCollections),
|
||||
shardsAffectedByTeamFailure(sABTF), getAverageShardBytes(getAverageShardBytes),
|
||||
startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
||||
finishMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
||||
fetchSourceLock(new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM)), activeRelocations(0),
|
||||
|
@ -707,12 +709,11 @@ struct DDQueueData {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getSourceServersForRange(Database cx,
|
||||
RelocateData input,
|
||||
PromiseStream<RelocateData> output,
|
||||
Reference<FlowLock> fetchLock) {
|
||||
state std::set<UID> servers;
|
||||
state Transaction tr(cx);
|
||||
ACTOR static Future<Void> getSourceServersForRange(DDQueueData* self,
|
||||
Database cx,
|
||||
RelocateData input,
|
||||
PromiseStream<RelocateData> output,
|
||||
Reference<FlowLock> fetchLock) {
|
||||
|
||||
// FIXME: is the merge case needed
|
||||
if (input.priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD) {
|
||||
|
@ -724,59 +725,9 @@ struct DDQueueData {
|
|||
wait(fetchLock->take(TaskPriority::DataDistributionLaunch));
|
||||
state FlowLock::Releaser releaser(*fetchLock);
|
||||
|
||||
loop {
|
||||
servers.clear();
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
try {
|
||||
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
RangeResult keyServersEntries = wait(tr.getRange(lastLessOrEqual(keyServersKey(input.keys.begin)),
|
||||
firstGreaterOrEqual(keyServersKey(input.keys.end)),
|
||||
SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS));
|
||||
|
||||
if (keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
|
||||
for (int shard = 0; shard < keyServersEntries.size(); shard++) {
|
||||
std::vector<UID> src, dest;
|
||||
decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest);
|
||||
ASSERT(src.size());
|
||||
for (int i = 0; i < src.size(); i++) {
|
||||
servers.insert(src[i]);
|
||||
}
|
||||
if (shard == 0) {
|
||||
input.completeSources = src;
|
||||
} else {
|
||||
for (int i = 0; i < input.completeSources.size(); i++) {
|
||||
if (std::find(src.begin(), src.end(), input.completeSources[i]) == src.end()) {
|
||||
swapAndPop(&input.completeSources, i--);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(servers.size() > 0);
|
||||
}
|
||||
|
||||
// If the size of keyServerEntries is large, then just assume we are using all storage servers
|
||||
// Why the size can be large?
|
||||
// When a shard is inflight and DD crashes, some destination servers may have already got the data.
|
||||
// The new DD will treat the destination servers as source servers. So the size can be large.
|
||||
else {
|
||||
RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
for (auto s = serverList.begin(); s != serverList.end(); ++s)
|
||||
servers.insert(decodeServerListValue(s->value).id());
|
||||
|
||||
ASSERT(servers.size() > 0);
|
||||
}
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
input.src = std::vector<UID>(servers.begin(), servers.end());
|
||||
auto future =
|
||||
storeTuple(self->txnProcessor->getSourceServersForRange(input.keys), input.src, input.completeSources);
|
||||
wait(ready(future));
|
||||
output.send(input);
|
||||
return Void();
|
||||
}
|
||||
|
@ -866,8 +817,8 @@ struct DDQueueData {
|
|||
startRelocation(rrs.priority, rrs.healthPriority);
|
||||
|
||||
fetchingSourcesQueue.insert(rrs);
|
||||
getSourceActors.insert(rrs.keys,
|
||||
getSourceServersForRange(cx, rrs, fetchSourceServersComplete, fetchSourceLock));
|
||||
getSourceActors.insert(
|
||||
rrs.keys, getSourceServersForRange(this, cx, rrs, fetchSourceServersComplete, fetchSourceLock));
|
||||
} else {
|
||||
RelocateData newData(rrs);
|
||||
newData.keys = affectedQueuedItems[r];
|
||||
|
|
|
@ -180,3 +180,20 @@ TEST_CASE("/flow/genericactors/AsyncListener") {
|
|||
ASSERT(!subscriber2.isReady());
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/flow/genericactors/generic/storeTuple") {
|
||||
state std::vector<UID> resA;
|
||||
state int resB;
|
||||
state double resC;
|
||||
|
||||
state Promise<std::tuple<std::vector<UID>, int, double>> promise;
|
||||
|
||||
auto future = storeTuple(promise.getFuture(), resA, resB, resC);
|
||||
|
||||
promise.send(std::make_tuple(std::vector<UID>(10), 15, 2.0));
|
||||
wait(ready(future));
|
||||
ASSERT(resA.size() == 10);
|
||||
ASSERT(resB == 15);
|
||||
ASSERT(resC == 2.0);
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
// version.
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/network.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include <utility>
|
||||
#include <functional>
|
||||
#if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H)
|
||||
|
@ -300,6 +301,14 @@ Future<Void> store(T& out, Future<T> what) {
|
|||
});
|
||||
}
|
||||
|
||||
template <class A, class... Bs>
|
||||
Future<Void> storeTuple(Future<std::tuple<A, Bs...>> what, A& a, Bs&... b) {
|
||||
return map(what, [&](std::tuple<A, Bs...> const& v) {
|
||||
std::tie(a, b...) = v;
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
||||
template <class T>
|
||||
Future<Void> storeOrThrow(T& out, Future<Optional<T>> what, Error e = key_not_found()) {
|
||||
return map(what, [&out, e](Optional<T> const& o) {
|
||||
|
|
Loading…
Reference in New Issue