2022-06-08 02:58:16 +08:00
|
|
|
|
/*
|
|
|
|
|
* DDTxnProcessor.actor.cpp
|
|
|
|
|
*
|
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
|
*
|
|
|
|
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
|
|
|
|
*
|
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
|
*
|
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
*
|
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
|
* limitations under the License.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#include "fdbserver/DDTxnProcessor.h"
|
2022-06-24 06:28:45 +08:00
|
|
|
|
#include "fdbclient/NativeAPI.actor.h"
|
2022-07-09 07:01:23 +08:00
|
|
|
|
#include "fdbclient/ManagementAPI.actor.h"
|
2022-07-12 12:54:47 +08:00
|
|
|
|
#include "fdbserver/DataDistribution.actor.h"
|
2022-09-22 01:56:22 +08:00
|
|
|
|
#include "fdbclient/DatabaseContext.h"
|
2023-05-05 01:42:21 +08:00
|
|
|
|
#include "flow/genericactors.actor.h"
|
2022-06-08 02:58:16 +08:00
|
|
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
|
|
|
|
|
2023-05-05 00:49:34 +08:00
|
|
|
|
static void updateServersAndCompleteSources(std::set<UID>& servers,
|
|
|
|
|
std::vector<UID>& completeSources,
|
|
|
|
|
int shard,
|
|
|
|
|
const std::vector<UID>& src) {
|
|
|
|
|
servers.insert(src.begin(), src.end());
|
|
|
|
|
if (shard == 0) {
|
|
|
|
|
completeSources = src;
|
|
|
|
|
} else {
|
|
|
|
|
for (int i = 0; i < completeSources.size(); i++) {
|
|
|
|
|
if (std::find(src.begin(), src.end(), completeSources[i]) == src.end()) {
|
|
|
|
|
swapAndPop(&completeSources, i--);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-08 02:58:16 +08:00
|
|
|
|
class DDTxnProcessorImpl {
|
2022-06-10 03:16:12 +08:00
|
|
|
|
friend class DDTxnProcessor;
|
|
|
|
|
|
2022-10-04 13:24:35 +08:00
|
|
|
|
ACTOR static Future<ServerWorkerInfos> getServerListAndProcessClasses(Database cx) {
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
state ServerWorkerInfos res;
|
|
|
|
|
loop {
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
2022-10-04 13:24:35 +08:00
|
|
|
|
try {
|
|
|
|
|
wait(store(res.servers, NativeAPI::getServerListAndProcessClasses(&tr)));
|
|
|
|
|
res.readVersion = tr.getReadVersion().get();
|
|
|
|
|
return res;
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-10 03:16:12 +08:00
|
|
|
|
// return {sourceServers, completeSources}
|
2022-06-14 02:27:50 +08:00
|
|
|
|
ACTOR static Future<IDDTxnProcessor::SourceServers> getSourceServersForRange(Database cx, KeyRangeRef keys) {
|
2022-06-08 02:58:16 +08:00
|
|
|
|
state std::set<UID> servers;
|
|
|
|
|
state std::vector<UID> completeSources;
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
servers.clear();
|
2022-06-10 03:16:12 +08:00
|
|
|
|
completeSources.clear();
|
|
|
|
|
|
2022-06-14 04:55:48 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
2022-06-08 02:58:16 +08:00
|
|
|
|
try {
|
|
|
|
|
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
|
|
|
|
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
|
|
|
|
|
RangeResult keyServersEntries = wait(tr.getRange(lastLessOrEqual(keyServersKey(keys.begin)),
|
|
|
|
|
firstGreaterOrEqual(keyServersKey(keys.end)),
|
|
|
|
|
SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS));
|
|
|
|
|
|
|
|
|
|
if (keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
|
|
|
|
|
for (int shard = 0; shard < keyServersEntries.size(); shard++) {
|
|
|
|
|
std::vector<UID> src, dest;
|
|
|
|
|
decodeKeyServersValue(UIDtoTagMap, keyServersEntries[shard].value, src, dest);
|
|
|
|
|
ASSERT(src.size());
|
2023-05-05 00:49:34 +08:00
|
|
|
|
updateServersAndCompleteSources(servers, completeSources, shard, src);
|
2022-06-08 02:58:16 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ASSERT(servers.size() > 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If the size of keyServerEntries is large, then just assume we are using all storage servers
|
|
|
|
|
// Why the size can be large?
|
|
|
|
|
// When a shard is inflight and DD crashes, some destination servers may have already got the data.
|
|
|
|
|
// The new DD will treat the destination servers as source servers. So the size can be large.
|
|
|
|
|
else {
|
|
|
|
|
RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
|
|
|
|
|
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
|
|
|
|
|
|
|
|
|
|
for (auto s = serverList.begin(); s != serverList.end(); ++s)
|
|
|
|
|
servers.insert(decodeServerListValue(s->value).id());
|
|
|
|
|
|
|
|
|
|
ASSERT(servers.size() > 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-06-10 03:16:12 +08:00
|
|
|
|
|
2022-06-14 02:27:50 +08:00
|
|
|
|
return IDDTxnProcessor::SourceServers{ std::vector<UID>(servers.begin(), servers.end()), completeSources };
|
2022-06-08 02:58:16 +08:00
|
|
|
|
}
|
2022-07-12 02:16:17 +08:00
|
|
|
|
|
2022-09-14 00:28:41 +08:00
|
|
|
|
ACTOR static Future<std::vector<IDDTxnProcessor::DDRangeLocations>> getSourceServerInterfacesForRange(
|
2022-09-07 10:00:09 +08:00
|
|
|
|
Database cx,
|
|
|
|
|
KeyRangeRef range) {
|
2022-09-14 00:28:41 +08:00
|
|
|
|
state std::vector<IDDTxnProcessor::DDRangeLocations> res;
|
2022-09-07 10:00:09 +08:00
|
|
|
|
state Transaction tr(cx);
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
2022-09-06 03:45:14 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
res.clear();
|
|
|
|
|
try {
|
|
|
|
|
state RangeResult shards = wait(krmGetRanges(&tr,
|
|
|
|
|
keyServersPrefix,
|
|
|
|
|
range,
|
|
|
|
|
SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT,
|
|
|
|
|
SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT));
|
|
|
|
|
ASSERT(!shards.empty());
|
|
|
|
|
|
|
|
|
|
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
|
|
|
|
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
|
|
|
|
|
|
|
|
|
|
state int i = 0;
|
|
|
|
|
for (i = 0; i < shards.size() - 1; ++i) {
|
2022-09-07 10:00:09 +08:00
|
|
|
|
state std::vector<UID> src;
|
2022-09-06 03:45:14 +08:00
|
|
|
|
std::vector<UID> dest;
|
|
|
|
|
UID srcId, destId;
|
|
|
|
|
decodeKeyServersValue(UIDtoTagMap, shards[i].value, src, dest, srcId, destId);
|
|
|
|
|
|
2022-10-08 04:38:11 +08:00
|
|
|
|
std::vector<Future<Optional<Value>>> serverListEntries;
|
|
|
|
|
for (int j = 0; j < src.size(); ++j) {
|
|
|
|
|
serverListEntries.push_back(tr.get(serverListKeyFor(src[j])));
|
|
|
|
|
}
|
|
|
|
|
std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
|
|
|
|
|
IDDTxnProcessor::DDRangeLocations current(KeyRangeRef(shards[i].key, shards[i + 1].key));
|
|
|
|
|
for (int j = 0; j < serverListValues.size(); ++j) {
|
|
|
|
|
if (!serverListValues[j].present()) {
|
2022-09-07 10:00:09 +08:00
|
|
|
|
TraceEvent(SevWarnAlways, "GetSourceServerInterfacesMissing")
|
|
|
|
|
.detail("StorageServer", src[j])
|
2022-10-08 04:38:11 +08:00
|
|
|
|
.detail("Range", KeyRangeRef(shards[i].key, shards[i + 1].key));
|
2022-09-06 03:45:14 +08:00
|
|
|
|
continue;
|
|
|
|
|
}
|
2022-10-08 04:38:11 +08:00
|
|
|
|
StorageServerInterface ssi = decodeServerListValue(serverListValues[j].get());
|
2022-09-13 00:35:24 +08:00
|
|
|
|
current.servers[ssi.locality.describeDcId()].push_back(ssi);
|
2022-09-06 03:45:14 +08:00
|
|
|
|
}
|
|
|
|
|
res.push_back(current);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
} catch (Error& e) {
|
2022-09-07 10:00:09 +08:00
|
|
|
|
TraceEvent(SevWarnAlways, "GetSourceServerInterfacesError").errorUnsuppressed(e).detail("Range", range);
|
2022-09-06 03:45:14 +08:00
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-12 02:16:17 +08:00
|
|
|
|
// set the system key space
|
|
|
|
|
ACTOR static Future<Void> updateReplicaKeys(Database cx,
|
|
|
|
|
std::vector<Optional<Key>> primaryDcId,
|
|
|
|
|
std::vector<Optional<Key>> remoteDcIds,
|
|
|
|
|
DatabaseConfiguration configuration) {
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
loop {
|
|
|
|
|
try {
|
|
|
|
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
|
|
|
|
|
RangeResult replicaKeys = wait(tr.getRange(datacenterReplicasKeys, CLIENT_KNOBS->TOO_MANY));
|
|
|
|
|
|
|
|
|
|
for (auto& kv : replicaKeys) {
|
|
|
|
|
auto dcId = decodeDatacenterReplicasKey(kv.key);
|
|
|
|
|
auto replicas = decodeDatacenterReplicasValue(kv.value);
|
|
|
|
|
if ((primaryDcId.size() && primaryDcId.at(0) == dcId) ||
|
|
|
|
|
(remoteDcIds.size() && remoteDcIds.at(0) == dcId && configuration.usableRegions > 1)) {
|
|
|
|
|
if (replicas > configuration.storageTeamSize) {
|
|
|
|
|
tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize));
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
tr.clear(kv.key);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wait(tr.commit());
|
|
|
|
|
break;
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
2022-07-12 12:54:47 +08:00
|
|
|
|
|
2022-09-27 14:00:31 +08:00
|
|
|
|
ACTOR static Future<int> tryUpdateReplicasKeyForDc(Database cx, Optional<Key> dcId, int storageTeamSize) {
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
loop {
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
|
2022-09-27 14:00:31 +08:00
|
|
|
|
try {
|
|
|
|
|
Optional<Value> val = wait(tr.get(datacenterReplicasKeyFor(dcId)));
|
|
|
|
|
state int oldReplicas = val.present() ? decodeDatacenterReplicasValue(val.get()) : 0;
|
|
|
|
|
if (oldReplicas == storageTeamSize) {
|
|
|
|
|
return oldReplicas;
|
|
|
|
|
}
|
|
|
|
|
if (oldReplicas < storageTeamSize) {
|
|
|
|
|
tr.set(rebootWhenDurableKey, StringRef());
|
|
|
|
|
}
|
|
|
|
|
tr.set(datacenterReplicasKeyFor(dcId), datacenterReplicasValue(storageTeamSize));
|
|
|
|
|
wait(tr.commit());
|
|
|
|
|
|
|
|
|
|
return oldReplicas;
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-12 12:54:47 +08:00
|
|
|
|
// Read keyservers, return unique set of teams
|
|
|
|
|
ACTOR static Future<Reference<InitialDataDistribution>> getInitialDataDistribution(
|
|
|
|
|
Database cx,
|
|
|
|
|
UID distributorId,
|
|
|
|
|
MoveKeysLock moveKeysLock,
|
|
|
|
|
std::vector<Optional<Key>> remoteDcIds,
|
2022-10-14 03:42:11 +08:00
|
|
|
|
const DDEnabledState* ddEnabledState,
|
2022-10-19 00:49:07 +08:00
|
|
|
|
SkipDDModeCheck skipDDModeCheck) {
|
2022-07-12 12:54:47 +08:00
|
|
|
|
state Reference<InitialDataDistribution> result = makeReference<InitialDataDistribution>();
|
|
|
|
|
state Key beginKey = allKeys.begin;
|
|
|
|
|
|
|
|
|
|
state bool succeeded;
|
|
|
|
|
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
|
2023-04-20 11:38:03 +08:00
|
|
|
|
if (ddLargeTeamEnabled()) {
|
2023-04-25 10:01:05 +08:00
|
|
|
|
wait(store(result->userRangeConfig,
|
2023-04-22 10:00:29 +08:00
|
|
|
|
DDConfiguration().userRangeConfig().getSnapshot(
|
2023-04-23 04:17:41 +08:00
|
|
|
|
SystemDBWriteLockedNow(cx.getReference()), allKeys.begin, allKeys.end)));
|
2023-04-20 11:38:03 +08:00
|
|
|
|
}
|
2022-07-12 12:54:47 +08:00
|
|
|
|
state std::map<UID, Optional<Key>> server_dc;
|
|
|
|
|
state std::map<std::vector<UID>, std::pair<std::vector<UID>, std::vector<UID>>> team_cache;
|
|
|
|
|
state std::vector<std::pair<StorageServerInterface, ProcessClass>> tss_servers;
|
|
|
|
|
state int numDataMoves = 0;
|
|
|
|
|
|
2022-10-19 00:49:07 +08:00
|
|
|
|
CODE_PROBE((bool)skipDDModeCheck, "DD Mode won't prevent read initial data distribution.");
|
2022-07-12 12:54:47 +08:00
|
|
|
|
// Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure
|
|
|
|
|
// causing entries to be duplicated
|
|
|
|
|
loop {
|
|
|
|
|
numDataMoves = 0;
|
|
|
|
|
server_dc.clear();
|
|
|
|
|
result->allServers.clear();
|
2023-01-16 13:46:14 +08:00
|
|
|
|
result->dataMoveMap = KeyRangeMap<std::shared_ptr<DataMove>>(std::make_shared<DataMove>());
|
|
|
|
|
result->auditStates.clear();
|
2022-07-12 12:54:47 +08:00
|
|
|
|
tss_servers.clear();
|
|
|
|
|
team_cache.clear();
|
|
|
|
|
succeeded = false;
|
|
|
|
|
try {
|
|
|
|
|
// Read healthyZone value which is later used to determine on/off of failure triggered DD
|
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
2022-07-12 12:54:47 +08:00
|
|
|
|
Optional<Value> val = wait(tr.get(healthyZoneKey));
|
|
|
|
|
if (val.present()) {
|
|
|
|
|
auto p = decodeHealthyZoneValue(val.get());
|
|
|
|
|
if (p.second > tr.getReadVersion().get() || p.first == ignoreSSFailuresZoneString) {
|
|
|
|
|
result->initHealthyZoneValue = Optional<Key>(p.first);
|
|
|
|
|
} else {
|
|
|
|
|
result->initHealthyZoneValue = Optional<Key>();
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
result->initHealthyZoneValue = Optional<Key>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result->mode = 1;
|
|
|
|
|
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
|
|
|
|
|
if (mode.present()) {
|
|
|
|
|
BinaryReader rd(mode.get(), Unversioned());
|
|
|
|
|
rd >> result->mode;
|
|
|
|
|
}
|
2023-04-01 02:07:56 +08:00
|
|
|
|
if ((!skipDDModeCheck && !result->mode) || !ddEnabledState->isEnabled()) {
|
|
|
|
|
// DD can be disabled persistently (result->mode = 0) or transiently (isEnabled() = 0)
|
2022-07-12 12:54:47 +08:00
|
|
|
|
TraceEvent(SevDebug, "GetInitialDataDistribution_DisabledDD").log();
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
state Future<std::vector<ProcessData>> workers = getWorkers(&tr);
|
|
|
|
|
state Future<RangeResult> serverList = tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY);
|
|
|
|
|
wait(success(workers) && success(serverList));
|
|
|
|
|
ASSERT(!serverList.get().more && serverList.get().size() < CLIENT_KNOBS->TOO_MANY);
|
|
|
|
|
|
|
|
|
|
std::map<Optional<Standalone<StringRef>>, ProcessData> id_data;
|
|
|
|
|
for (int i = 0; i < workers.get().size(); i++)
|
|
|
|
|
id_data[workers.get()[i].locality.processId()] = workers.get()[i];
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < serverList.get().size(); i++) {
|
|
|
|
|
auto ssi = decodeServerListValue(serverList.get()[i].value);
|
|
|
|
|
if (!ssi.isTss()) {
|
|
|
|
|
result->allServers.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
|
|
|
|
|
server_dc[ssi.id()] = ssi.locality.dcId();
|
|
|
|
|
} else {
|
|
|
|
|
tss_servers.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RangeResult dms = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY));
|
|
|
|
|
ASSERT(!dms.more && dms.size() < CLIENT_KNOBS->TOO_MANY);
|
2022-10-31 14:30:49 +08:00
|
|
|
|
// For each data move, find out the src or dst servers are in primary or remote DC.
|
2022-07-12 12:54:47 +08:00
|
|
|
|
for (int i = 0; i < dms.size(); ++i) {
|
|
|
|
|
auto dataMove = std::make_shared<DataMove>(decodeDataMoveValue(dms[i].value), true);
|
|
|
|
|
const DataMoveMetaData& meta = dataMove->meta;
|
2023-04-05 02:38:24 +08:00
|
|
|
|
if (meta.ranges.empty()) {
|
2023-04-07 04:48:18 +08:00
|
|
|
|
// Any persisted datamove with an empty range must be an tombstone persisted by
|
|
|
|
|
// a background cleanup (with retry_clean_up_datamove_tombstone_added),
|
|
|
|
|
// and this datamove must be in DataMoveMetaData::Deleting state
|
|
|
|
|
// A datamove without processed by a background cleanup must have a non-empty range
|
|
|
|
|
// For this case, we simply clear the range when dd init
|
2023-04-05 02:38:24 +08:00
|
|
|
|
ASSERT(meta.getPhase() == DataMoveMetaData::Deleting);
|
2023-04-07 04:48:18 +08:00
|
|
|
|
result->toCleanDataMoveTombstone.push_back(meta.id);
|
2023-04-05 02:38:24 +08:00
|
|
|
|
continue;
|
|
|
|
|
}
|
2022-10-29 06:22:55 +08:00
|
|
|
|
ASSERT(!meta.ranges.empty());
|
2022-07-12 12:54:47 +08:00
|
|
|
|
for (const UID& id : meta.src) {
|
|
|
|
|
auto& dc = server_dc[id];
|
|
|
|
|
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
|
|
|
|
dataMove->remoteSrc.push_back(id);
|
|
|
|
|
} else {
|
|
|
|
|
dataMove->primarySrc.push_back(id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for (const UID& id : meta.dest) {
|
|
|
|
|
auto& dc = server_dc[id];
|
|
|
|
|
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
|
|
|
|
dataMove->remoteDest.push_back(id);
|
|
|
|
|
} else {
|
|
|
|
|
dataMove->primaryDest.push_back(id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
std::sort(dataMove->primarySrc.begin(), dataMove->primarySrc.end());
|
|
|
|
|
std::sort(dataMove->remoteSrc.begin(), dataMove->remoteSrc.end());
|
|
|
|
|
std::sort(dataMove->primaryDest.begin(), dataMove->primaryDest.end());
|
|
|
|
|
std::sort(dataMove->remoteDest.begin(), dataMove->remoteDest.end());
|
|
|
|
|
|
2022-10-29 06:22:55 +08:00
|
|
|
|
auto ranges = result->dataMoveMap.intersectingRanges(meta.ranges.front());
|
2022-07-12 12:54:47 +08:00
|
|
|
|
for (auto& r : ranges) {
|
|
|
|
|
ASSERT(!r.value()->valid);
|
|
|
|
|
}
|
2022-10-29 06:22:55 +08:00
|
|
|
|
result->dataMoveMap.insert(meta.ranges.front(), std::move(dataMove));
|
2022-07-12 12:54:47 +08:00
|
|
|
|
++numDataMoves;
|
|
|
|
|
}
|
|
|
|
|
|
2023-01-16 13:46:14 +08:00
|
|
|
|
RangeResult ads = wait(tr.getRange(auditKeys, CLIENT_KNOBS->TOO_MANY));
|
|
|
|
|
ASSERT(!ads.more && ads.size() < CLIENT_KNOBS->TOO_MANY);
|
|
|
|
|
for (int i = 0; i < ads.size(); ++i) {
|
2023-05-02 01:35:52 +08:00
|
|
|
|
auto auditState = decodeAuditStorageState(ads[i].value);
|
|
|
|
|
result->auditStates.push_back(auditState);
|
2023-01-16 13:46:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-07-12 12:54:47 +08:00
|
|
|
|
succeeded = true;
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
} catch (Error& e) {
|
2022-11-04 13:31:09 +08:00
|
|
|
|
TraceEvent("GetInitialTeamsRetry", distributorId).error(e);
|
2022-07-12 12:54:47 +08:00
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
|
|
|
|
|
ASSERT(!succeeded); // We shouldn't be retrying if we have already started modifying result in this loop
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If keyServers is too large to read in a single transaction, then we will have to break this process up into
|
|
|
|
|
// multiple transactions. In that case, each iteration should begin where the previous left off
|
|
|
|
|
while (beginKey < allKeys.end) {
|
2022-07-20 04:15:51 +08:00
|
|
|
|
CODE_PROBE(beginKey > allKeys.begin, "Multi-transactional getInitialDataDistribution");
|
2022-07-12 12:54:47 +08:00
|
|
|
|
loop {
|
|
|
|
|
succeeded = false;
|
|
|
|
|
try {
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
2022-07-12 12:54:47 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
wait(checkMoveKeysLockReadOnly(&tr, moveKeysLock, ddEnabledState));
|
|
|
|
|
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
|
|
|
|
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
|
|
|
|
|
RangeResult keyServers = wait(krmGetRanges(&tr,
|
|
|
|
|
keyServersPrefix,
|
|
|
|
|
KeyRangeRef(beginKey, allKeys.end),
|
|
|
|
|
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
|
|
|
|
|
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
|
|
|
|
succeeded = true;
|
|
|
|
|
|
|
|
|
|
std::vector<UID> src, dest, last;
|
|
|
|
|
UID srcId, destId;
|
|
|
|
|
|
|
|
|
|
// for each range
|
|
|
|
|
for (int i = 0; i < keyServers.size() - 1; i++) {
|
|
|
|
|
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest, srcId, destId);
|
|
|
|
|
DDShardInfo info(keyServers[i].key, srcId, destId);
|
|
|
|
|
if (remoteDcIds.size()) {
|
|
|
|
|
auto srcIter = team_cache.find(src);
|
|
|
|
|
if (srcIter == team_cache.end()) {
|
|
|
|
|
for (auto& id : src) {
|
|
|
|
|
auto& dc = server_dc[id];
|
|
|
|
|
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
|
|
|
|
info.remoteSrc.push_back(id);
|
|
|
|
|
} else {
|
|
|
|
|
info.primarySrc.push_back(id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
result->primaryTeams.insert(info.primarySrc);
|
|
|
|
|
result->remoteTeams.insert(info.remoteSrc);
|
|
|
|
|
team_cache[src] = std::make_pair(info.primarySrc, info.remoteSrc);
|
|
|
|
|
} else {
|
|
|
|
|
info.primarySrc = srcIter->second.first;
|
|
|
|
|
info.remoteSrc = srcIter->second.second;
|
|
|
|
|
}
|
|
|
|
|
if (dest.size()) {
|
|
|
|
|
info.hasDest = true;
|
|
|
|
|
auto destIter = team_cache.find(dest);
|
|
|
|
|
if (destIter == team_cache.end()) {
|
|
|
|
|
for (auto& id : dest) {
|
|
|
|
|
auto& dc = server_dc[id];
|
|
|
|
|
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) !=
|
|
|
|
|
remoteDcIds.end()) {
|
|
|
|
|
info.remoteDest.push_back(id);
|
|
|
|
|
} else {
|
|
|
|
|
info.primaryDest.push_back(id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
result->primaryTeams.insert(info.primaryDest);
|
|
|
|
|
result->remoteTeams.insert(info.remoteDest);
|
|
|
|
|
team_cache[dest] = std::make_pair(info.primaryDest, info.remoteDest);
|
|
|
|
|
} else {
|
|
|
|
|
info.primaryDest = destIter->second.first;
|
|
|
|
|
info.remoteDest = destIter->second.second;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
info.primarySrc = src;
|
|
|
|
|
auto srcIter = team_cache.find(src);
|
|
|
|
|
if (srcIter == team_cache.end()) {
|
|
|
|
|
result->primaryTeams.insert(src);
|
|
|
|
|
team_cache[src] = std::pair<std::vector<UID>, std::vector<UID>>();
|
|
|
|
|
}
|
|
|
|
|
if (dest.size()) {
|
|
|
|
|
info.hasDest = true;
|
|
|
|
|
info.primaryDest = dest;
|
|
|
|
|
auto destIter = team_cache.find(dest);
|
|
|
|
|
if (destIter == team_cache.end()) {
|
|
|
|
|
result->primaryTeams.insert(dest);
|
|
|
|
|
team_cache[dest] = std::pair<std::vector<UID>, std::vector<UID>>();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
result->shards.push_back(info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ASSERT_GT(keyServers.size(), 0);
|
|
|
|
|
beginKey = keyServers.end()[-1].key;
|
|
|
|
|
break;
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
TraceEvent("GetInitialTeamsKeyServersRetry", distributorId).error(e);
|
|
|
|
|
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
ASSERT(!succeeded); // We shouldn't be retrying if we have already started modifying result in this
|
|
|
|
|
// loop
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tr.reset();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// a dummy shard at the end with no keys or servers makes life easier for trackInitialShards()
|
|
|
|
|
result->shards.push_back(DDShardInfo(allKeys.end));
|
|
|
|
|
|
2022-08-04 04:51:40 +08:00
|
|
|
|
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && numDataMoves > 0) {
|
2022-07-12 12:54:47 +08:00
|
|
|
|
for (int shard = 0; shard < result->shards.size() - 1; ++shard) {
|
|
|
|
|
const DDShardInfo& iShard = result->shards[shard];
|
|
|
|
|
KeyRangeRef keys = KeyRangeRef(iShard.key, result->shards[shard + 1].key);
|
|
|
|
|
result->dataMoveMap[keys.begin]->validateShard(iShard, keys);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add tss to server list AFTER teams are built
|
|
|
|
|
for (auto& it : tss_servers) {
|
|
|
|
|
result->allServers.push_back(it);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
2022-07-13 05:28:49 +08:00
|
|
|
|
|
2022-07-13 02:25:59 +08:00
|
|
|
|
ACTOR static Future<Void> waitForDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
loop {
|
|
|
|
|
wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY, TaskPriority::DataDistribution));
|
|
|
|
|
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
|
2022-07-13 02:25:59 +08:00
|
|
|
|
try {
|
|
|
|
|
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
|
2023-04-01 02:07:56 +08:00
|
|
|
|
if (!mode.present() && ddEnabledState->isEnabled()) {
|
2022-07-13 02:25:59 +08:00
|
|
|
|
TraceEvent("WaitForDDEnabledSucceeded").log();
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
if (mode.present()) {
|
|
|
|
|
BinaryReader rd(mode.get(), Unversioned());
|
|
|
|
|
int m;
|
|
|
|
|
rd >> m;
|
|
|
|
|
TraceEvent(SevDebug, "WaitForDDEnabled")
|
|
|
|
|
.detail("Mode", m)
|
2023-04-01 02:07:56 +08:00
|
|
|
|
.detail("IsDDEnabled", ddEnabledState->isEnabled());
|
|
|
|
|
if (m && ddEnabledState->isEnabled()) {
|
2022-07-13 02:25:59 +08:00
|
|
|
|
TraceEvent("WaitForDDEnabledSucceeded").log();
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tr.reset();
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-08-02 04:26:58 +08:00
|
|
|
|
|
2022-08-19 01:23:33 +08:00
|
|
|
|
ACTOR static Future<bool> isDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
loop {
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
|
|
|
|
|
2022-08-19 01:23:33 +08:00
|
|
|
|
try {
|
|
|
|
|
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
|
2023-04-01 02:07:56 +08:00
|
|
|
|
if (!mode.present() && ddEnabledState->isEnabled())
|
2022-08-19 01:23:33 +08:00
|
|
|
|
return true;
|
|
|
|
|
if (mode.present()) {
|
|
|
|
|
BinaryReader rd(mode.get(), Unversioned());
|
|
|
|
|
int m;
|
|
|
|
|
rd >> m;
|
2023-04-01 02:07:56 +08:00
|
|
|
|
if (m && ddEnabledState->isEnabled()) {
|
2022-08-19 01:23:33 +08:00
|
|
|
|
TraceEvent(SevDebug, "IsDDEnabledSucceeded")
|
|
|
|
|
.detail("Mode", m)
|
2023-04-01 02:07:56 +08:00
|
|
|
|
.detail("IsDDEnabled", ddEnabledState->isEnabled());
|
2022-08-19 01:23:33 +08:00
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// SOMEDAY: Write a wrapper in MoveKeys.actor.h
|
|
|
|
|
Optional<Value> readVal = wait(tr.get(moveKeysLockOwnerKey));
|
|
|
|
|
UID currentOwner =
|
|
|
|
|
readVal.present() ? BinaryReader::fromStringRef<UID>(readVal.get(), Unversioned()) : UID();
|
2023-04-01 02:07:56 +08:00
|
|
|
|
if (ddEnabledState->isEnabled() && (currentOwner != dataDistributionModeLock)) {
|
2022-08-19 01:23:33 +08:00
|
|
|
|
TraceEvent(SevDebug, "IsDDEnabledSucceeded")
|
|
|
|
|
.detail("CurrentOwner", currentOwner)
|
|
|
|
|
.detail("DDModeLock", dataDistributionModeLock)
|
2023-04-01 02:07:56 +08:00
|
|
|
|
.detail("IsDDEnabled", ddEnabledState->isEnabled());
|
2022-08-19 01:23:33 +08:00
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
TraceEvent(SevDebug, "IsDDEnabledFailed")
|
|
|
|
|
.detail("CurrentOwner", currentOwner)
|
|
|
|
|
.detail("DDModeLock", dataDistributionModeLock)
|
2023-04-01 02:07:56 +08:00
|
|
|
|
.detail("IsDDEnabled", ddEnabledState->isEnabled());
|
2022-08-19 01:23:33 +08:00
|
|
|
|
return false;
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-02 04:26:58 +08:00
|
|
|
|
ACTOR static Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnabledState* ddEnabledState) {
|
|
|
|
|
loop {
|
|
|
|
|
wait(delay(SERVER_KNOBS->MOVEKEYS_LOCK_POLLING_DELAY));
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
loop {
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
2022-08-02 04:26:58 +08:00
|
|
|
|
try {
|
|
|
|
|
wait(checkMoveKeysLockReadOnly(&tr, lock, ddEnabledState));
|
|
|
|
|
break;
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-09-22 05:58:34 +08:00
|
|
|
|
|
|
|
|
|
ACTOR static Future<Optional<Value>> readRebalanceDDIgnoreKey(Database cx) {
|
|
|
|
|
state Transaction tr(cx);
|
|
|
|
|
loop {
|
|
|
|
|
try {
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
2022-09-22 05:58:34 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
2022-11-03 14:17:10 +08:00
|
|
|
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
|
2022-09-22 05:58:34 +08:00
|
|
|
|
Optional<Value> res = wait(tr.get(rebalanceDDIgnoreKey));
|
|
|
|
|
return res;
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-09-27 14:00:31 +08:00
|
|
|
|
|
|
|
|
|
ACTOR static Future<Void> waitDDTeamInfoPrintSignal(Database cx) {
|
|
|
|
|
state ReadYourWritesTransaction tr(cx);
|
|
|
|
|
loop {
|
|
|
|
|
try {
|
|
|
|
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
|
state Future<Void> watchFuture = tr.watch(triggerDDTeamInfoPrintKey);
|
|
|
|
|
wait(tr.commit());
|
|
|
|
|
wait(watchFuture);
|
|
|
|
|
return Void();
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr.onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-05-05 01:42:21 +08:00
|
|
|
|
|
|
|
|
|
ACTOR static Future<Void> waitForAllDataRemoved(
|
|
|
|
|
Database cx,
|
|
|
|
|
UID serverID,
|
|
|
|
|
Version addedVersion,
|
|
|
|
|
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure) {
|
|
|
|
|
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
|
|
|
|
loop {
|
|
|
|
|
try {
|
|
|
|
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
|
|
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
|
|
|
Version ver = wait(tr->getReadVersion());
|
|
|
|
|
|
|
|
|
|
// we cannot remove a server immediately after adding it, because a perfectly timed cluster recovery
|
|
|
|
|
// could cause us to not store the mutations sent to the short lived storage server.
|
|
|
|
|
if (ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
|
|
|
|
|
bool canRemove = wait(canRemoveStorageServer(tr, serverID));
|
|
|
|
|
auto shards = shardsAffectedByTeamFailure->getNumberOfShards(serverID);
|
|
|
|
|
TraceEvent(SevVerbose, "WaitForAllDataRemoved")
|
|
|
|
|
.detail("Server", serverID)
|
|
|
|
|
.detail("CanRemove", canRemove)
|
|
|
|
|
.detail("Shards", shards);
|
|
|
|
|
ASSERT_GE(shards, 0);
|
|
|
|
|
if (canRemove && shards == 0) {
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for any change to the serverKeys for this server
|
|
|
|
|
wait(delay(SERVER_KNOBS->ALL_DATA_REMOVED_DELAY, TaskPriority::DataDistribution));
|
|
|
|
|
tr->reset();
|
|
|
|
|
} catch (Error& e) {
|
|
|
|
|
wait(tr->onError(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2022-06-10 03:16:12 +08:00
|
|
|
|
};
|
|
|
|
|
|
2023-05-05 01:42:21 +08:00
|
|
|
|
Future<Void> DDTxnProcessor::waitForAllDataRemoved(
|
|
|
|
|
const UID& serverID,
|
|
|
|
|
const Version& addedVersion,
|
|
|
|
|
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure) const {
|
|
|
|
|
|
|
|
|
|
return DDTxnProcessorImpl::waitForAllDataRemoved(cx, serverID, addedVersion, shardsAffectedByTeamFailure);
|
|
|
|
|
}
|
|
|
|
|
|
2022-06-14 02:27:50 +08:00
|
|
|
|
Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
|
2022-06-10 03:16:12 +08:00
|
|
|
|
return DDTxnProcessorImpl::getSourceServersForRange(cx, range);
|
2022-06-24 06:28:45 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-09-14 00:28:41 +08:00
|
|
|
|
Future<std::vector<IDDTxnProcessor::DDRangeLocations>> DDTxnProcessor::getSourceServerInterfacesForRange(
|
2022-09-06 03:45:14 +08:00
|
|
|
|
const KeyRangeRef range) {
|
|
|
|
|
return DDTxnProcessorImpl::getSourceServerInterfacesForRange(cx, range);
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-04 13:24:35 +08:00
|
|
|
|
Future<ServerWorkerInfos> DDTxnProcessor::getServerListAndProcessClasses() {
|
|
|
|
|
return DDTxnProcessorImpl::getServerListAndProcessClasses(cx);
|
2022-07-09 05:11:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-08-19 02:44:27 +08:00
|
|
|
|
Future<MoveKeysLock> DDTxnProcessor::takeMoveKeysLock(const UID& ddId) const {
|
2022-07-09 05:11:31 +08:00
|
|
|
|
return ::takeMoveKeysLock(cx, ddId);
|
2022-07-09 07:01:23 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<DatabaseConfiguration> DDTxnProcessor::getDatabaseConfiguration() const {
|
2022-07-09 07:34:55 +08:00
|
|
|
|
return ::getDatabaseConfiguration(cx);
|
2022-07-12 02:16:17 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Void> DDTxnProcessor::updateReplicaKeys(const std::vector<Optional<Key>>& primaryIds,
|
|
|
|
|
const std::vector<Optional<Key>>& remoteIds,
|
|
|
|
|
const DatabaseConfiguration& configuration) const {
|
|
|
|
|
return DDTxnProcessorImpl::updateReplicaKeys(cx, primaryIds, remoteIds, configuration);
|
2022-07-12 12:54:47 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Reference<InitialDataDistribution>> DDTxnProcessor::getInitialDataDistribution(
|
|
|
|
|
const UID& distributorId,
|
|
|
|
|
const MoveKeysLock& moveKeysLock,
|
|
|
|
|
const std::vector<Optional<Key>>& remoteDcIds,
|
2022-10-14 03:42:11 +08:00
|
|
|
|
const DDEnabledState* ddEnabledState,
|
2022-10-17 13:17:34 +08:00
|
|
|
|
SkipDDModeCheck skipDDModeCheck) {
|
2022-10-14 03:42:11 +08:00
|
|
|
|
return DDTxnProcessorImpl::getInitialDataDistribution(
|
2022-10-19 00:49:07 +08:00
|
|
|
|
cx, distributorId, moveKeysLock, remoteDcIds, ddEnabledState, skipDDModeCheck);
|
2022-07-13 05:28:49 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-07-13 02:25:59 +08:00
|
|
|
|
Future<Void> DDTxnProcessor::waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const {
|
|
|
|
|
return DDTxnProcessorImpl::waitForDataDistributionEnabled(cx, ddEnabledState);
|
2022-07-20 04:15:51 +08:00
|
|
|
|
}
|
2022-08-02 04:26:58 +08:00
|
|
|
|
|
2022-08-19 01:23:33 +08:00
|
|
|
|
Future<bool> DDTxnProcessor::isDataDistributionEnabled(const DDEnabledState* ddEnabledState) const {
|
|
|
|
|
return DDTxnProcessorImpl::isDataDistributionEnabled(cx, ddEnabledState);
|
|
|
|
|
}
|
|
|
|
|
|
2022-08-19 02:44:27 +08:00
|
|
|
|
Future<Void> DDTxnProcessor::pollMoveKeysLock(const MoveKeysLock& lock, const DDEnabledState* ddEnabledState) const {
|
2022-08-02 04:26:58 +08:00
|
|
|
|
return DDTxnProcessorImpl::pollMoveKeysLock(cx, lock, ddEnabledState);
|
2022-09-22 01:56:22 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-09-20 05:41:34 +08:00
|
|
|
|
Future<std::pair<Optional<StorageMetrics>, int>> DDTxnProcessor::waitStorageMetrics(
|
|
|
|
|
const KeyRange& keys,
|
|
|
|
|
const StorageMetrics& min,
|
|
|
|
|
const StorageMetrics& max,
|
|
|
|
|
const StorageMetrics& permittedError,
|
|
|
|
|
int shardLimit,
|
|
|
|
|
int expectedShardCount) const {
|
|
|
|
|
return cx->waitStorageMetrics(keys, min, max, permittedError, shardLimit, expectedShardCount);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Standalone<VectorRef<KeyRef>>> DDTxnProcessor::splitStorageMetrics(const KeyRange& keys,
|
|
|
|
|
const StorageMetrics& limit,
|
|
|
|
|
const StorageMetrics& estimated,
|
|
|
|
|
const Optional<int>& minSplitBytes) const {
|
|
|
|
|
return cx->splitStorageMetrics(keys, limit, estimated, minSplitBytes);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> DDTxnProcessor::getReadHotRanges(const KeyRange& keys) const {
|
|
|
|
|
return cx->getReadHotRanges(keys);
|
|
|
|
|
}
|
2022-09-24 05:49:46 +08:00
|
|
|
|
|
2022-09-22 01:56:22 +08:00
|
|
|
|
Future<HealthMetrics> DDTxnProcessor::getHealthMetrics(bool detailed) const {
|
|
|
|
|
return cx->getHealthMetrics(detailed);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Optional<Value>> DDTxnProcessor::readRebalanceDDIgnoreKey() const {
|
2022-09-22 05:58:34 +08:00
|
|
|
|
return DDTxnProcessorImpl::readRebalanceDDIgnoreKey(cx);
|
2022-09-22 01:56:22 +08:00
|
|
|
|
}
|
2022-09-27 14:00:31 +08:00
|
|
|
|
|
|
|
|
|
Future<int> DDTxnProcessor::tryUpdateReplicasKeyForDc(const Optional<Key>& dcId, const int& storageTeamSize) const {
|
|
|
|
|
return DDTxnProcessorImpl::tryUpdateReplicasKeyForDc(cx, dcId, storageTeamSize);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Void> DDTxnProcessor::waitDDTeamInfoPrintSignal() const {
|
|
|
|
|
return DDTxnProcessorImpl::waitDDTeamInfoPrintSignal(cx);
|
|
|
|
|
}
|
2022-10-03 13:07:42 +08:00
|
|
|
|
|
2022-10-05 05:57:04 +08:00
|
|
|
|
Future<std::vector<ProcessData>> DDTxnProcessor::getWorkers() const {
|
|
|
|
|
return ::getWorkers(cx);
|
|
|
|
|
}
|
|
|
|
|
|
2022-11-11 04:51:22 +08:00
|
|
|
|
Future<Void> DDTxnProcessor::rawStartMovement(const MoveKeysParams& params,
|
2022-10-11 07:04:43 +08:00
|
|
|
|
std::map<UID, StorageServerInterface>& tssMapping) {
|
2022-10-12 01:53:43 +08:00
|
|
|
|
return ::rawStartMovement(cx, params, tssMapping);
|
2022-10-11 07:04:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-11-11 04:51:22 +08:00
|
|
|
|
Future<Void> DDTxnProcessor::rawFinishMovement(const MoveKeysParams& params,
|
2022-10-11 07:04:43 +08:00
|
|
|
|
const std::map<UID, StorageServerInterface>& tssMapping) {
|
2022-10-12 01:53:43 +08:00
|
|
|
|
return ::rawFinishMovement(cx, params, tssMapping);
|
2022-10-11 07:04:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct DDMockTxnProcessorImpl {
|
2022-10-28 06:46:48 +08:00
|
|
|
|
// return when all status become FETCHED
|
2022-10-28 01:59:19 +08:00
|
|
|
|
ACTOR static Future<Void> checkFetchingState(DDMockTxnProcessor* self, std::vector<UID> ids, KeyRangeRef range) {
|
2023-05-10 11:24:05 +08:00
|
|
|
|
state TraceInterval interval("MockCheckFetchingState");
|
|
|
|
|
TraceEvent(SevDebug, interval.begin()).detail("Range", range);
|
2022-10-28 01:59:19 +08:00
|
|
|
|
loop {
|
2023-05-10 11:24:05 +08:00
|
|
|
|
wait(delayJittered(1.0, TaskPriority::FetchKeys));
|
|
|
|
|
bool done = true;
|
|
|
|
|
for (auto& id : ids) {
|
|
|
|
|
auto& server = self->mgs->allServers.at(id);
|
|
|
|
|
if (!server->allShardStatusIn(range, { MockShardStatus::FETCHED, MockShardStatus::COMPLETED })) {
|
|
|
|
|
done = false;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (done) {
|
2022-10-28 01:59:19 +08:00
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-05-10 11:24:05 +08:00
|
|
|
|
TraceEvent(SevDebug, interval.end()).log();
|
2022-10-28 01:59:19 +08:00
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
|
2022-11-29 08:03:45 +08:00
|
|
|
|
static Future<Void> rawCheckFetchingState(DDMockTxnProcessor* self, const MoveKeysParams& params) {
|
2022-11-29 07:30:57 +08:00
|
|
|
|
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
|
|
|
|
ASSERT(params.ranges.present());
|
|
|
|
|
// TODO: make startMoveShards work with multiple ranges.
|
2022-11-29 08:03:45 +08:00
|
|
|
|
ASSERT(params.ranges.get().size() == 1);
|
2022-11-29 07:30:57 +08:00
|
|
|
|
return checkFetchingState(self, params.destinationTeam, params.ranges.get().at(0));
|
|
|
|
|
}
|
|
|
|
|
ASSERT(params.keys.present());
|
|
|
|
|
return checkFetchingState(self, params.destinationTeam, params.keys.get());
|
|
|
|
|
}
|
|
|
|
|
|
2022-10-11 07:04:43 +08:00
|
|
|
|
ACTOR static Future<Void> moveKeys(DDMockTxnProcessor* self, MoveKeysParams params) {
|
2022-10-18 07:43:32 +08:00
|
|
|
|
state std::map<UID, StorageServerInterface> tssMapping;
|
2022-11-02 05:22:04 +08:00
|
|
|
|
// Because SFBTF::Team requires the ID is ordered
|
|
|
|
|
std::sort(params.destinationTeam.begin(), params.destinationTeam.end());
|
|
|
|
|
std::sort(params.healthyDestinations.begin(), params.healthyDestinations.end());
|
|
|
|
|
|
2022-11-10 03:34:47 +08:00
|
|
|
|
wait(self->rawStartMovement(params, tssMapping));
|
2022-10-11 07:04:43 +08:00
|
|
|
|
ASSERT(tssMapping.empty());
|
|
|
|
|
|
2022-11-29 08:03:45 +08:00
|
|
|
|
wait(rawCheckFetchingState(self, params));
|
2022-10-11 07:04:43 +08:00
|
|
|
|
|
2022-11-10 03:34:47 +08:00
|
|
|
|
wait(self->rawFinishMovement(params, tssMapping));
|
2022-10-11 07:04:43 +08:00
|
|
|
|
if (!params.dataMovementComplete.isSet())
|
|
|
|
|
params.dataMovementComplete.send(Void());
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2022-10-04 13:24:35 +08:00
|
|
|
|
Future<ServerWorkerInfos> DDMockTxnProcessor::getServerListAndProcessClasses() {
|
|
|
|
|
ServerWorkerInfos res;
|
2022-08-30 05:59:40 +08:00
|
|
|
|
for (auto& [_, mss] : mgs->allServers) {
|
2023-04-13 07:08:01 +08:00
|
|
|
|
res.servers.emplace_back(mss->ssi, ProcessClass(ProcessClass::StorageClass, ProcessClass::DBSource));
|
2022-07-26 14:25:08 +08:00
|
|
|
|
}
|
2022-10-04 13:24:35 +08:00
|
|
|
|
// FIXME(xwang): possible generate version from time?
|
|
|
|
|
res.readVersion = 0;
|
2022-07-26 14:25:08 +08:00
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-14 15:17:00 +08:00
|
|
|
|
std::pair<std::set<std::vector<UID>>, std::set<std::vector<UID>>> getAllTeamsInRegion(
|
|
|
|
|
const std::vector<DDShardInfo>& shards) {
|
|
|
|
|
std::set<std::vector<UID>> primary, remote;
|
|
|
|
|
for (auto& info : shards) {
|
|
|
|
|
if (!info.primarySrc.empty())
|
|
|
|
|
primary.emplace(info.primarySrc);
|
|
|
|
|
if (!info.primaryDest.empty())
|
|
|
|
|
primary.emplace(info.primaryDest);
|
|
|
|
|
if (!info.remoteSrc.empty())
|
|
|
|
|
remote.emplace(info.remoteSrc);
|
|
|
|
|
if (!info.remoteDest.empty())
|
|
|
|
|
remote.emplace(info.remoteDest);
|
2022-07-26 14:25:08 +08:00
|
|
|
|
}
|
2022-09-14 15:17:00 +08:00
|
|
|
|
return { primary, remote };
|
2022-07-26 14:25:08 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-08-30 05:59:40 +08:00
|
|
|
|
inline void transformTeamsToServerIds(std::vector<ShardsAffectedByTeamFailure::Team>& teams,
|
2022-09-01 13:26:10 +08:00
|
|
|
|
std::vector<UID>& primaryIds,
|
|
|
|
|
std::vector<UID>& remoteIds) {
|
2022-08-30 05:59:40 +08:00
|
|
|
|
std::set<UID> primary, remote;
|
|
|
|
|
for (auto& team : teams) {
|
|
|
|
|
team.primary ? primary.insert(team.servers.begin(), team.servers.end())
|
|
|
|
|
: remote.insert(team.servers.begin(), team.servers.end());
|
|
|
|
|
}
|
|
|
|
|
primaryIds = std::vector<UID>(primary.begin(), primary.end());
|
|
|
|
|
remoteIds = std::vector<UID>(remote.begin(), remote.end());
|
|
|
|
|
}
|
|
|
|
|
|
2022-09-01 13:26:10 +08:00
|
|
|
|
// reconstruct DDShardInfos from shardMapping
|
2022-07-26 14:25:08 +08:00
|
|
|
|
std::vector<DDShardInfo> DDMockTxnProcessor::getDDShardInfos() const {
|
|
|
|
|
std::vector<DDShardInfo> res;
|
2022-08-30 05:59:40 +08:00
|
|
|
|
res.reserve(mgs->shardMapping->getNumberOfShards());
|
|
|
|
|
auto allRange = mgs->shardMapping->getAllRanges();
|
2022-09-14 05:39:12 +08:00
|
|
|
|
ASSERT(allRange.end().begin() == allKeys.end);
|
2022-08-30 05:59:40 +08:00
|
|
|
|
for (auto it = allRange.begin(); it != allRange.end(); ++it) {
|
2022-07-26 14:25:08 +08:00
|
|
|
|
// FIXME: now just use anonymousShardId
|
2022-08-30 05:59:40 +08:00
|
|
|
|
KeyRangeRef curRange = it->range();
|
2022-09-14 13:43:15 +08:00
|
|
|
|
DDShardInfo info(curRange.begin);
|
|
|
|
|
|
2022-10-20 02:43:29 +08:00
|
|
|
|
auto teams = mgs->shardMapping->getTeamsForFirstShard(curRange);
|
2022-09-14 13:43:15 +08:00
|
|
|
|
if (!teams.first.empty() && !teams.second.empty()) {
|
|
|
|
|
CODE_PROBE(true, "Mock InitialDataDistribution In-Flight shard");
|
2022-07-26 14:25:08 +08:00
|
|
|
|
info.hasDest = true;
|
2022-09-14 13:43:15 +08:00
|
|
|
|
info.destId = anonymousShardId;
|
|
|
|
|
info.srcId = anonymousShardId;
|
2022-08-30 05:59:40 +08:00
|
|
|
|
transformTeamsToServerIds(teams.second, info.primarySrc, info.remoteSrc);
|
|
|
|
|
transformTeamsToServerIds(teams.first, info.primaryDest, info.remoteDest);
|
2022-09-14 13:43:15 +08:00
|
|
|
|
} else if (!teams.first.empty()) {
|
|
|
|
|
CODE_PROBE(true, "Mock InitialDataDistribution Static shard");
|
|
|
|
|
info.srcId = anonymousShardId;
|
2022-08-30 05:59:40 +08:00
|
|
|
|
transformTeamsToServerIds(teams.first, info.primarySrc, info.remoteSrc);
|
2022-09-14 13:43:15 +08:00
|
|
|
|
} else {
|
|
|
|
|
ASSERT(false);
|
2022-07-26 14:25:08 +08:00
|
|
|
|
}
|
2022-09-14 13:43:15 +08:00
|
|
|
|
|
|
|
|
|
res.push_back(std::move(info));
|
2022-07-26 14:25:08 +08:00
|
|
|
|
}
|
|
|
|
|
res.emplace_back(allKeys.end);
|
2022-09-14 05:39:12 +08:00
|
|
|
|
|
2022-08-30 05:59:40 +08:00
|
|
|
|
return res;
|
2022-07-26 14:25:08 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Reference<InitialDataDistribution>> DDMockTxnProcessor::getInitialDataDistribution(
|
|
|
|
|
const UID& distributorId,
|
|
|
|
|
const MoveKeysLock& moveKeysLock,
|
|
|
|
|
const std::vector<Optional<Key>>& remoteDcIds,
|
2022-10-14 03:42:11 +08:00
|
|
|
|
const DDEnabledState* ddEnabledState,
|
2022-10-17 13:17:34 +08:00
|
|
|
|
SkipDDModeCheck skipDDModeCheck) {
|
2022-07-26 14:25:08 +08:00
|
|
|
|
|
|
|
|
|
// FIXME: now we just ignore ddEnabledState and moveKeysLock, will fix it in the future
|
|
|
|
|
Reference<InitialDataDistribution> res = makeReference<InitialDataDistribution>();
|
|
|
|
|
res->mode = 1;
|
2022-10-04 13:24:35 +08:00
|
|
|
|
res->allServers = getServerListAndProcessClasses().get().servers;
|
2022-07-26 14:25:08 +08:00
|
|
|
|
res->shards = getDDShardInfos();
|
2022-09-14 15:17:00 +08:00
|
|
|
|
std::tie(res->primaryTeams, res->remoteTeams) = getAllTeamsInRegion(res->shards);
|
2022-07-26 14:25:08 +08:00
|
|
|
|
return res;
|
|
|
|
|
}
|
2022-08-31 01:59:14 +08:00
|
|
|
|
|
|
|
|
|
Future<Void> DDMockTxnProcessor::removeKeysFromFailedServer(const UID& serverID,
|
|
|
|
|
const std::vector<UID>& teamForDroppedRange,
|
|
|
|
|
const MoveKeysLock& lock,
|
|
|
|
|
const DDEnabledState* ddEnabledState) const {
|
2022-09-02 00:29:16 +08:00
|
|
|
|
|
2022-09-13 04:03:45 +08:00
|
|
|
|
// This function only takes effect when user exclude failed IP:PORT in the fdbcli. In the first version , the mock
|
|
|
|
|
// class won’t support this.
|
|
|
|
|
UNREACHABLE();
|
2022-08-31 01:59:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Void> DDMockTxnProcessor::removeStorageServer(const UID& serverID,
|
|
|
|
|
const Optional<UID>& tssPairID,
|
|
|
|
|
const MoveKeysLock& lock,
|
|
|
|
|
const DDEnabledState* ddEnabledState) const {
|
2022-10-26 06:43:24 +08:00
|
|
|
|
ASSERT(mgs->allShardsRemovedFromServer(serverID));
|
2022-08-31 01:59:14 +08:00
|
|
|
|
mgs->allServers.erase(serverID);
|
|
|
|
|
return Void();
|
|
|
|
|
}
|
2022-09-14 05:39:12 +08:00
|
|
|
|
|
|
|
|
|
void DDMockTxnProcessor::setupMockGlobalState(Reference<InitialDataDistribution> initData) {
|
|
|
|
|
for (auto& [ssi, pInfo] : initData->allServers) {
|
|
|
|
|
mgs->addStorageServer(ssi);
|
|
|
|
|
}
|
|
|
|
|
mgs->shardMapping->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::ForceNoCheck);
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < initData->shards.size() - 1; ++i) {
|
|
|
|
|
// insert to keyServers
|
|
|
|
|
auto& shardInfo = initData->shards[i];
|
|
|
|
|
ASSERT(shardInfo.remoteSrc.empty() && shardInfo.remoteDest.empty());
|
|
|
|
|
|
|
|
|
|
uint64_t shardBytes =
|
|
|
|
|
deterministicRandom()->randomInt(SERVER_KNOBS->MIN_SHARD_BYTES, SERVER_KNOBS->MAX_SHARD_BYTES);
|
|
|
|
|
KeyRangeRef keys(shardInfo.key, initData->shards[i + 1].key);
|
|
|
|
|
mgs->shardMapping->assignRangeToTeams(keys, { { shardInfo.primarySrc, true } });
|
|
|
|
|
if (shardInfo.hasDest) {
|
|
|
|
|
mgs->shardMapping->moveShard(keys, { { shardInfo.primaryDest, true } });
|
|
|
|
|
}
|
|
|
|
|
// insert to serverKeys
|
|
|
|
|
for (auto& id : shardInfo.primarySrc) {
|
2023-04-13 07:08:01 +08:00
|
|
|
|
mgs->allServers.at(id)->serverKeys.insert(keys, { MockShardStatus::COMPLETED, shardBytes });
|
2022-09-14 05:39:12 +08:00
|
|
|
|
}
|
|
|
|
|
for (auto& id : shardInfo.primaryDest) {
|
2023-04-13 07:08:01 +08:00
|
|
|
|
mgs->allServers.at(id)->serverKeys.insert(keys, { MockShardStatus::INFLIGHT, shardBytes });
|
2022-09-14 05:39:12 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mgs->shardMapping->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::Normal);
|
|
|
|
|
}
|
2022-09-15 04:58:48 +08:00
|
|
|
|
|
2022-09-23 04:11:53 +08:00
|
|
|
|
Future<Void> DDMockTxnProcessor::moveKeys(const MoveKeysParams& params) {
|
2022-10-11 07:04:43 +08:00
|
|
|
|
// Not support location metadata yet
|
|
|
|
|
ASSERT(!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
|
|
|
|
|
return DDMockTxnProcessorImpl::moveKeys(this, params);
|
2022-09-29 05:24:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FIXME: finish implementation
|
|
|
|
|
Future<HealthMetrics> DDMockTxnProcessor::getHealthMetrics(bool detailed) const {
|
|
|
|
|
return Future<HealthMetrics>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<Standalone<VectorRef<KeyRef>>> DDMockTxnProcessor::splitStorageMetrics(
|
|
|
|
|
const KeyRange& keys,
|
|
|
|
|
const StorageMetrics& limit,
|
|
|
|
|
const StorageMetrics& estimated,
|
|
|
|
|
const Optional<int>& minSplitBytes) const {
|
2022-10-21 04:10:03 +08:00
|
|
|
|
return mgs->splitStorageMetrics(keys, limit, estimated, minSplitBytes);
|
2022-09-29 05:24:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Future<std::pair<Optional<StorageMetrics>, int>> DDMockTxnProcessor::waitStorageMetrics(
|
|
|
|
|
const KeyRange& keys,
|
|
|
|
|
const StorageMetrics& min,
|
|
|
|
|
const StorageMetrics& max,
|
|
|
|
|
const StorageMetrics& permittedError,
|
|
|
|
|
int shardLimit,
|
|
|
|
|
int expectedShardCount) const {
|
2022-10-19 04:24:24 +08:00
|
|
|
|
return mgs->waitStorageMetrics(keys, min, max, permittedError, shardLimit, expectedShardCount);
|
2022-09-29 05:24:46 +08:00
|
|
|
|
}
|
2022-10-05 05:57:04 +08:00
|
|
|
|
|
|
|
|
|
// FIXME: finish implementation
|
|
|
|
|
Future<std::vector<ProcessData>> DDMockTxnProcessor::getWorkers() const {
|
|
|
|
|
return Future<std::vector<ProcessData>>();
|
|
|
|
|
}
|
2022-10-11 07:04:43 +08:00
|
|
|
|
|
2022-11-10 03:34:47 +08:00
|
|
|
|
ACTOR Future<Void> rawStartMovement(std::shared_ptr<MockGlobalState> mgs,
|
|
|
|
|
MoveKeysParams params,
|
|
|
|
|
std::map<UID, StorageServerInterface> tssMapping) {
|
2023-05-09 05:12:55 +08:00
|
|
|
|
state TraceInterval interval("RelocateShard_MockStartMoveKeys");
|
2022-11-29 07:30:57 +08:00
|
|
|
|
state KeyRange keys;
|
|
|
|
|
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
|
|
|
|
ASSERT(params.ranges.present());
|
|
|
|
|
// TODO: make startMoveShards work with multiple ranges.
|
2022-11-29 08:03:45 +08:00
|
|
|
|
ASSERT(params.ranges.get().size() == 1);
|
2022-11-29 07:30:57 +08:00
|
|
|
|
keys = params.ranges.get().at(0);
|
|
|
|
|
} else {
|
|
|
|
|
ASSERT(params.keys.present());
|
|
|
|
|
keys = params.keys.get();
|
|
|
|
|
}
|
2023-05-10 11:24:05 +08:00
|
|
|
|
TraceEvent(SevDebug, interval.begin()).detail("Keys", keys);
|
2022-11-10 03:34:47 +08:00
|
|
|
|
wait(params.startMoveKeysParallelismLock->take(TaskPriority::DataDistributionLaunch));
|
|
|
|
|
state FlowLock::Releaser releaser(*params.startMoveKeysParallelismLock);
|
2022-10-18 05:42:04 +08:00
|
|
|
|
|
2022-10-11 07:04:43 +08:00
|
|
|
|
std::vector<ShardsAffectedByTeamFailure::Team> destTeams;
|
|
|
|
|
destTeams.emplace_back(params.destinationTeam, true);
|
2022-11-09 15:37:58 +08:00
|
|
|
|
// invariant: the splitting and merge operation won't happen at the same moveKeys action. For example, if [a,c) [c,
|
|
|
|
|
// e) exists, the params.keys won't be [b, d).
|
2022-11-29 07:30:57 +08:00
|
|
|
|
auto intersectRanges = mgs->shardMapping->intersectingRanges(keys);
|
2022-11-09 15:37:58 +08:00
|
|
|
|
// 1. splitting or just move a range. The new boundary need to be defined in startMovement
|
2022-11-29 07:30:57 +08:00
|
|
|
|
if (intersectRanges.begin().range().contains(keys)) {
|
|
|
|
|
mgs->shardMapping->defineShard(keys);
|
2022-11-09 15:37:58 +08:00
|
|
|
|
}
|
|
|
|
|
// 2. merge ops will coalesce the boundary in finishMovement;
|
2022-11-29 07:30:57 +08:00
|
|
|
|
intersectRanges = mgs->shardMapping->intersectingRanges(keys);
|
|
|
|
|
ASSERT(keys.begin == intersectRanges.begin().begin());
|
|
|
|
|
ASSERT(keys.end == intersectRanges.end().begin());
|
2022-11-09 15:37:58 +08:00
|
|
|
|
|
2023-05-10 11:24:05 +08:00
|
|
|
|
int totalRangeSize = 0;
|
2022-11-09 15:37:58 +08:00
|
|
|
|
for (auto it = intersectRanges.begin(); it != intersectRanges.end(); ++it) {
|
|
|
|
|
auto teamPair = mgs->shardMapping->getTeamsFor(it->begin());
|
|
|
|
|
auto& srcTeams = teamPair.second.empty() ? teamPair.first : teamPair.second;
|
2023-05-10 11:24:05 +08:00
|
|
|
|
totalRangeSize += mgs->getRangeSize(it->range());
|
2022-11-09 15:37:58 +08:00
|
|
|
|
mgs->shardMapping->rawMoveShard(it->range(), srcTeams, destTeams);
|
|
|
|
|
}
|
2022-10-11 07:04:43 +08:00
|
|
|
|
|
|
|
|
|
for (auto& id : params.destinationTeam) {
|
2022-10-28 01:59:19 +08:00
|
|
|
|
auto& server = mgs->allServers.at(id);
|
2023-05-10 11:24:05 +08:00
|
|
|
|
server->setShardStatus(keys, MockShardStatus::INFLIGHT);
|
|
|
|
|
server->signalFetchKeys(keys, totalRangeSize);
|
2022-10-11 07:04:43 +08:00
|
|
|
|
}
|
2023-05-09 05:12:55 +08:00
|
|
|
|
TraceEvent(SevDebug, interval.end());
|
2022-11-10 03:34:47 +08:00
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
|
2022-11-11 04:51:22 +08:00
|
|
|
|
Future<Void> DDMockTxnProcessor::rawStartMovement(const MoveKeysParams& params,
|
2022-11-10 03:34:47 +08:00
|
|
|
|
std::map<UID, StorageServerInterface>& tssMapping) {
|
|
|
|
|
return ::rawStartMovement(mgs, params, tssMapping);
|
2022-10-11 07:04:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-11-10 03:34:47 +08:00
|
|
|
|
ACTOR Future<Void> rawFinishMovement(std::shared_ptr<MockGlobalState> mgs,
|
|
|
|
|
MoveKeysParams params,
|
|
|
|
|
std::map<UID, StorageServerInterface> tssMapping) {
|
2023-05-09 05:12:55 +08:00
|
|
|
|
state TraceInterval interval("RelocateShard_MockFinishMoveKeys");
|
2022-11-29 07:30:57 +08:00
|
|
|
|
state KeyRange keys;
|
|
|
|
|
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
|
|
|
|
|
ASSERT(params.ranges.present());
|
|
|
|
|
// TODO: make startMoveShards work with multiple ranges.
|
2022-11-29 08:03:45 +08:00
|
|
|
|
ASSERT(params.ranges.get().size() == 1);
|
2022-11-29 07:30:57 +08:00
|
|
|
|
keys = params.ranges.get().at(0);
|
|
|
|
|
} else {
|
|
|
|
|
ASSERT(params.keys.present());
|
|
|
|
|
keys = params.keys.get();
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-10 11:24:05 +08:00
|
|
|
|
TraceEvent(SevDebug, interval.begin()).detail("Keys", keys);
|
|
|
|
|
|
2022-11-10 03:34:47 +08:00
|
|
|
|
wait(params.finishMoveKeysParallelismLock->take(TaskPriority::DataDistributionLaunch));
|
|
|
|
|
state FlowLock::Releaser releaser(*params.finishMoveKeysParallelismLock);
|
2022-10-14 03:42:11 +08:00
|
|
|
|
|
2022-10-11 07:04:43 +08:00
|
|
|
|
// get source and dest teams
|
2022-11-29 07:30:57 +08:00
|
|
|
|
auto [destTeams, srcTeams] = mgs->shardMapping->getTeamsForFirstShard(keys);
|
2022-10-13 02:17:50 +08:00
|
|
|
|
|
2022-11-09 15:37:58 +08:00
|
|
|
|
ASSERT_EQ(destTeams.size(), 1); // Will the multi-region or dynamic replica make destTeam.size() > 1?
|
2022-10-13 02:17:50 +08:00
|
|
|
|
if (destTeams.front() != ShardsAffectedByTeamFailure::Team{ params.destinationTeam, true }) {
|
|
|
|
|
TraceEvent(SevError, "MockRawFinishMovementError")
|
|
|
|
|
.detail("Reason", "InconsistentDestinations")
|
|
|
|
|
.detail("ShardMappingDest", describe(destTeams.front().servers))
|
|
|
|
|
.detail("ParamDest", describe(params.destinationTeam));
|
|
|
|
|
ASSERT(false); // This shouldn't happen because the overlapped key range movement won't be executed in parallel
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (auto& id : params.destinationTeam) {
|
2023-05-10 11:24:05 +08:00
|
|
|
|
mgs->allServers.at(id)->setShardStatus(keys, MockShardStatus::COMPLETED);
|
2022-10-13 02:17:50 +08:00
|
|
|
|
}
|
|
|
|
|
|
2022-11-02 05:22:04 +08:00
|
|
|
|
// remove destination servers from source servers
|
2023-05-09 05:12:55 +08:00
|
|
|
|
ASSERT_EQ(srcTeams.size(), 1);
|
2022-10-13 02:17:50 +08:00
|
|
|
|
for (auto& id : srcTeams.front().servers) {
|
2022-11-17 05:18:40 +08:00
|
|
|
|
// the only caller moveKeys will always make sure the UID are sorted
|
2022-11-02 05:22:04 +08:00
|
|
|
|
if (!std::binary_search(params.destinationTeam.begin(), params.destinationTeam.end(), id)) {
|
2023-04-13 07:08:01 +08:00
|
|
|
|
mgs->allServers.at(id)->removeShard(keys);
|
2022-11-02 05:22:04 +08:00
|
|
|
|
}
|
2022-10-13 02:17:50 +08:00
|
|
|
|
}
|
2022-11-29 07:30:57 +08:00
|
|
|
|
mgs->shardMapping->finishMove(keys);
|
|
|
|
|
mgs->shardMapping->defineShard(keys); // coalesce for merge
|
2023-05-09 05:12:55 +08:00
|
|
|
|
TraceEvent(SevDebug, interval.end());
|
2022-11-10 03:34:47 +08:00
|
|
|
|
return Void();
|
|
|
|
|
}
|
|
|
|
|
|
2022-11-11 04:51:22 +08:00
|
|
|
|
Future<Void> DDMockTxnProcessor::rawFinishMovement(const MoveKeysParams& params,
|
2022-11-10 03:34:47 +08:00
|
|
|
|
const std::map<UID, StorageServerInterface>& tssMapping) {
|
|
|
|
|
return ::rawFinishMovement(mgs, params, tssMapping);
|
2022-10-11 07:04:43 +08:00
|
|
|
|
}
|
2023-03-23 02:36:13 +08:00
|
|
|
|
|
|
|
|
|
Future<Optional<HealthMetrics::StorageStats>> DDTxnProcessor::getStorageStats(const UID& id,
|
|
|
|
|
double maxStaleness) const {
|
|
|
|
|
return cx->getStorageStats(id, maxStaleness);
|
|
|
|
|
}
|
2023-04-09 09:11:12 +08:00
|
|
|
|
|
|
|
|
|
Future<Optional<HealthMetrics::StorageStats>> DDMockTxnProcessor::getStorageStats(const UID& id,
|
|
|
|
|
double maxStaleness) const {
|
2023-04-13 14:09:47 +08:00
|
|
|
|
auto it = mgs->allServers.find(id);
|
|
|
|
|
if (it == mgs->allServers.end()) {
|
|
|
|
|
return Optional<HealthMetrics::StorageStats>();
|
|
|
|
|
}
|
|
|
|
|
return Optional<HealthMetrics::StorageStats>(it->second->getStorageStats());
|
2023-04-09 09:11:12 +08:00
|
|
|
|
}
|
2023-05-04 07:43:52 +08:00
|
|
|
|
|
|
|
|
|
Future<DatabaseConfiguration> DDMockTxnProcessor::getDatabaseConfiguration() const {
|
|
|
|
|
return mgs->configuration;
|
|
|
|
|
}
|
2023-05-05 00:49:34 +08:00
|
|
|
|
|
|
|
|
|
Future<IDDTxnProcessor::SourceServers> DDMockTxnProcessor::getSourceServersForRange(const KeyRangeRef keys) {
|
|
|
|
|
std::set<UID> servers;
|
|
|
|
|
std::vector<UID> completeSources;
|
|
|
|
|
auto ranges = mgs->shardMapping->intersectingRanges(keys);
|
|
|
|
|
int count = 0;
|
|
|
|
|
for (auto it = ranges.begin(); it != ranges.end(); ++it, ++count) {
|
|
|
|
|
auto sources = mgs->shardMapping->getSourceServerIdsFor(it->begin());
|
|
|
|
|
ASSERT(!sources.empty());
|
|
|
|
|
updateServersAndCompleteSources(servers, completeSources, count, sources);
|
|
|
|
|
}
|
|
|
|
|
ASSERT(!servers.empty());
|
|
|
|
|
return IDDTxnProcessor::SourceServers{ std::vector<UID>(servers.begin(), servers.end()), completeSources };
|
|
|
|
|
}
|
2023-05-05 01:42:21 +08:00
|
|
|
|
|
|
|
|
|
Future<Void> DDMockTxnProcessor::waitForAllDataRemoved(
|
|
|
|
|
const UID& serverID,
|
|
|
|
|
const Version& addedVersion,
|
|
|
|
|
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure) const {
|
|
|
|
|
return checkUntil(
|
|
|
|
|
SERVER_KNOBS->ALL_DATA_REMOVED_DELAY,
|
|
|
|
|
[&, this]() -> bool {
|
|
|
|
|
return mgs->allShardsRemovedFromServer(serverID) &&
|
|
|
|
|
shardsAffectedByTeamFailure->getNumberOfShards(serverID);
|
|
|
|
|
},
|
|
|
|
|
TaskPriority::DataDistribution);
|
|
|
|
|
}
|