Merge pull request #7573 from sfc-gh-xwang/feature/dd-refactor-incremental
move getInitialDataDistribution to DDTxnProcessor
This commit is contained in:
commit
a2062df220
|
@ -21,6 +21,7 @@
|
|||
#include "fdbserver/DDTxnProcessor.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
class DDTxnProcessorImpl {
|
||||
|
@ -125,6 +126,245 @@ class DDTxnProcessorImpl {
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Read keyservers, return unique set of teams
|
||||
ACTOR static Future<Reference<InitialDataDistribution>> getInitialDataDistribution(
|
||||
Database cx,
|
||||
UID distributorId,
|
||||
MoveKeysLock moveKeysLock,
|
||||
std::vector<Optional<Key>> remoteDcIds,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
state Reference<InitialDataDistribution> result = makeReference<InitialDataDistribution>();
|
||||
state Key beginKey = allKeys.begin;
|
||||
|
||||
state bool succeeded;
|
||||
|
||||
state Transaction tr(cx);
|
||||
|
||||
state std::map<UID, Optional<Key>> server_dc;
|
||||
state std::map<std::vector<UID>, std::pair<std::vector<UID>, std::vector<UID>>> team_cache;
|
||||
state std::vector<std::pair<StorageServerInterface, ProcessClass>> tss_servers;
|
||||
state int numDataMoves = 0;
|
||||
|
||||
// Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure
|
||||
// causing entries to be duplicated
|
||||
loop {
|
||||
numDataMoves = 0;
|
||||
server_dc.clear();
|
||||
result->allServers.clear();
|
||||
tss_servers.clear();
|
||||
team_cache.clear();
|
||||
succeeded = false;
|
||||
try {
|
||||
// Read healthyZone value which is later used to determine on/off of failure triggered DD
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
Optional<Value> val = wait(tr.get(healthyZoneKey));
|
||||
if (val.present()) {
|
||||
auto p = decodeHealthyZoneValue(val.get());
|
||||
if (p.second > tr.getReadVersion().get() || p.first == ignoreSSFailuresZoneString) {
|
||||
result->initHealthyZoneValue = Optional<Key>(p.first);
|
||||
} else {
|
||||
result->initHealthyZoneValue = Optional<Key>();
|
||||
}
|
||||
} else {
|
||||
result->initHealthyZoneValue = Optional<Key>();
|
||||
}
|
||||
|
||||
result->mode = 1;
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
|
||||
if (mode.present()) {
|
||||
BinaryReader rd(mode.get(), Unversioned());
|
||||
rd >> result->mode;
|
||||
}
|
||||
if (!result->mode || !ddEnabledState->isDDEnabled()) {
|
||||
// DD can be disabled persistently (result->mode = 0) or transiently (isDDEnabled() = 0)
|
||||
TraceEvent(SevDebug, "GetInitialDataDistribution_DisabledDD").log();
|
||||
return result;
|
||||
}
|
||||
|
||||
state Future<std::vector<ProcessData>> workers = getWorkers(&tr);
|
||||
state Future<RangeResult> serverList = tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
wait(success(workers) && success(serverList));
|
||||
ASSERT(!serverList.get().more && serverList.get().size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
std::map<Optional<Standalone<StringRef>>, ProcessData> id_data;
|
||||
for (int i = 0; i < workers.get().size(); i++)
|
||||
id_data[workers.get()[i].locality.processId()] = workers.get()[i];
|
||||
|
||||
for (int i = 0; i < serverList.get().size(); i++) {
|
||||
auto ssi = decodeServerListValue(serverList.get()[i].value);
|
||||
if (!ssi.isTss()) {
|
||||
result->allServers.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
|
||||
server_dc[ssi.id()] = ssi.locality.dcId();
|
||||
} else {
|
||||
tss_servers.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
|
||||
}
|
||||
}
|
||||
|
||||
RangeResult dms = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!dms.more && dms.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
for (int i = 0; i < dms.size(); ++i) {
|
||||
auto dataMove = std::make_shared<DataMove>(decodeDataMoveValue(dms[i].value), true);
|
||||
const DataMoveMetaData& meta = dataMove->meta;
|
||||
for (const UID& id : meta.src) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
||||
dataMove->remoteSrc.push_back(id);
|
||||
} else {
|
||||
dataMove->primarySrc.push_back(id);
|
||||
}
|
||||
}
|
||||
for (const UID& id : meta.dest) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
||||
dataMove->remoteDest.push_back(id);
|
||||
} else {
|
||||
dataMove->primaryDest.push_back(id);
|
||||
}
|
||||
}
|
||||
std::sort(dataMove->primarySrc.begin(), dataMove->primarySrc.end());
|
||||
std::sort(dataMove->remoteSrc.begin(), dataMove->remoteSrc.end());
|
||||
std::sort(dataMove->primaryDest.begin(), dataMove->primaryDest.end());
|
||||
std::sort(dataMove->remoteDest.begin(), dataMove->remoteDest.end());
|
||||
|
||||
auto ranges = result->dataMoveMap.intersectingRanges(meta.range);
|
||||
for (auto& r : ranges) {
|
||||
ASSERT(!r.value()->valid);
|
||||
}
|
||||
result->dataMoveMap.insert(meta.range, std::move(dataMove));
|
||||
++numDataMoves;
|
||||
}
|
||||
|
||||
succeeded = true;
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
||||
ASSERT(!succeeded); // We shouldn't be retrying if we have already started modifying result in this loop
|
||||
TraceEvent("GetInitialTeamsRetry", distributorId).log();
|
||||
}
|
||||
}
|
||||
|
||||
// If keyServers is too large to read in a single transaction, then we will have to break this process up into
|
||||
// multiple transactions. In that case, each iteration should begin where the previous left off
|
||||
while (beginKey < allKeys.end) {
|
||||
TEST(beginKey > allKeys.begin); // Multi-transactional getInitialDataDistribution
|
||||
loop {
|
||||
succeeded = false;
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
wait(checkMoveKeysLockReadOnly(&tr, moveKeysLock, ddEnabledState));
|
||||
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
RangeResult keyServers = wait(krmGetRanges(&tr,
|
||||
keyServersPrefix,
|
||||
KeyRangeRef(beginKey, allKeys.end),
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
||||
succeeded = true;
|
||||
|
||||
std::vector<UID> src, dest, last;
|
||||
UID srcId, destId;
|
||||
|
||||
// for each range
|
||||
for (int i = 0; i < keyServers.size() - 1; i++) {
|
||||
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest, srcId, destId);
|
||||
DDShardInfo info(keyServers[i].key, srcId, destId);
|
||||
if (remoteDcIds.size()) {
|
||||
auto srcIter = team_cache.find(src);
|
||||
if (srcIter == team_cache.end()) {
|
||||
for (auto& id : src) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
||||
info.remoteSrc.push_back(id);
|
||||
} else {
|
||||
info.primarySrc.push_back(id);
|
||||
}
|
||||
}
|
||||
result->primaryTeams.insert(info.primarySrc);
|
||||
result->remoteTeams.insert(info.remoteSrc);
|
||||
team_cache[src] = std::make_pair(info.primarySrc, info.remoteSrc);
|
||||
} else {
|
||||
info.primarySrc = srcIter->second.first;
|
||||
info.remoteSrc = srcIter->second.second;
|
||||
}
|
||||
if (dest.size()) {
|
||||
info.hasDest = true;
|
||||
auto destIter = team_cache.find(dest);
|
||||
if (destIter == team_cache.end()) {
|
||||
for (auto& id : dest) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) !=
|
||||
remoteDcIds.end()) {
|
||||
info.remoteDest.push_back(id);
|
||||
} else {
|
||||
info.primaryDest.push_back(id);
|
||||
}
|
||||
}
|
||||
result->primaryTeams.insert(info.primaryDest);
|
||||
result->remoteTeams.insert(info.remoteDest);
|
||||
team_cache[dest] = std::make_pair(info.primaryDest, info.remoteDest);
|
||||
} else {
|
||||
info.primaryDest = destIter->second.first;
|
||||
info.remoteDest = destIter->second.second;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info.primarySrc = src;
|
||||
auto srcIter = team_cache.find(src);
|
||||
if (srcIter == team_cache.end()) {
|
||||
result->primaryTeams.insert(src);
|
||||
team_cache[src] = std::pair<std::vector<UID>, std::vector<UID>>();
|
||||
}
|
||||
if (dest.size()) {
|
||||
info.hasDest = true;
|
||||
info.primaryDest = dest;
|
||||
auto destIter = team_cache.find(dest);
|
||||
if (destIter == team_cache.end()) {
|
||||
result->primaryTeams.insert(dest);
|
||||
team_cache[dest] = std::pair<std::vector<UID>, std::vector<UID>>();
|
||||
}
|
||||
}
|
||||
}
|
||||
result->shards.push_back(info);
|
||||
}
|
||||
|
||||
ASSERT_GT(keyServers.size(), 0);
|
||||
beginKey = keyServers.end()[-1].key;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GetInitialTeamsKeyServersRetry", distributorId).error(e);
|
||||
|
||||
wait(tr.onError(e));
|
||||
ASSERT(!succeeded); // We shouldn't be retrying if we have already started modifying result in this
|
||||
// loop
|
||||
}
|
||||
}
|
||||
|
||||
tr.reset();
|
||||
}
|
||||
|
||||
// a dummy shard at the end with no keys or servers makes life easier for trackInitialShards()
|
||||
result->shards.push_back(DDShardInfo(allKeys.end));
|
||||
|
||||
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && numDataMoves > 0) {
|
||||
for (int shard = 0; shard < result->shards.size() - 1; ++shard) {
|
||||
const DDShardInfo& iShard = result->shards[shard];
|
||||
KeyRangeRef keys = KeyRangeRef(iShard.key, result->shards[shard + 1].key);
|
||||
result->dataMoveMap[keys.begin]->validateShard(iShard, keys);
|
||||
}
|
||||
}
|
||||
|
||||
// add tss to server list AFTER teams are built
|
||||
for (auto& it : tss_servers) {
|
||||
result->allServers.push_back(it);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> waitForDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
|
@ -180,6 +420,14 @@ Future<Void> DDTxnProcessor::updateReplicaKeys(const std::vector<Optional<Key>>&
|
|||
return DDTxnProcessorImpl::updateReplicaKeys(cx, primaryIds, remoteIds, configuration);
|
||||
}
|
||||
|
||||
Future<Reference<InitialDataDistribution>> DDTxnProcessor::getInitialDataDistribution(
|
||||
const UID& distributorId,
|
||||
const MoveKeysLock& moveKeysLock,
|
||||
const std::vector<Optional<Key>>& remoteDcIds,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
return DDTxnProcessorImpl::getInitialDataDistribution(cx, distributorId, moveKeysLock, remoteDcIds, ddEnabledState);
|
||||
}
|
||||
|
||||
Future<Void> DDTxnProcessor::waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const {
|
||||
return DDTxnProcessorImpl::waitForDataDistributionEnabled(cx, ddEnabledState);
|
||||
}
|
|
@ -102,242 +102,6 @@ void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int pr
|
|||
}
|
||||
}
|
||||
|
||||
// Read keyservers, return unique set of teams
|
||||
ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Database cx,
|
||||
UID distributorId,
|
||||
MoveKeysLock moveKeysLock,
|
||||
std::vector<Optional<Key>> remoteDcIds,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
state Reference<InitialDataDistribution> result = makeReference<InitialDataDistribution>();
|
||||
state Key beginKey = allKeys.begin;
|
||||
|
||||
state bool succeeded;
|
||||
|
||||
state Transaction tr(cx);
|
||||
|
||||
state std::map<UID, Optional<Key>> server_dc;
|
||||
state std::map<std::vector<UID>, std::pair<std::vector<UID>, std::vector<UID>>> team_cache;
|
||||
state std::vector<std::pair<StorageServerInterface, ProcessClass>> tss_servers;
|
||||
state int numDataMoves = 0;
|
||||
|
||||
// Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure
|
||||
// causing entries to be duplicated
|
||||
loop {
|
||||
numDataMoves = 0;
|
||||
server_dc.clear();
|
||||
result->allServers.clear();
|
||||
tss_servers.clear();
|
||||
team_cache.clear();
|
||||
succeeded = false;
|
||||
try {
|
||||
// Read healthyZone value which is later used to determine on/off of failure triggered DD
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
Optional<Value> val = wait(tr.get(healthyZoneKey));
|
||||
if (val.present()) {
|
||||
auto p = decodeHealthyZoneValue(val.get());
|
||||
if (p.second > tr.getReadVersion().get() || p.first == ignoreSSFailuresZoneString) {
|
||||
result->initHealthyZoneValue = Optional<Key>(p.first);
|
||||
} else {
|
||||
result->initHealthyZoneValue = Optional<Key>();
|
||||
}
|
||||
} else {
|
||||
result->initHealthyZoneValue = Optional<Key>();
|
||||
}
|
||||
|
||||
result->mode = 1;
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
|
||||
if (mode.present()) {
|
||||
BinaryReader rd(mode.get(), Unversioned());
|
||||
rd >> result->mode;
|
||||
}
|
||||
if (!result->mode || !ddEnabledState->isDDEnabled()) {
|
||||
// DD can be disabled persistently (result->mode = 0) or transiently (isDDEnabled() = 0)
|
||||
TraceEvent(SevDebug, "GetInitialDataDistribution_DisabledDD").log();
|
||||
return result;
|
||||
}
|
||||
|
||||
state Future<std::vector<ProcessData>> workers = getWorkers(&tr);
|
||||
state Future<RangeResult> serverList = tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
wait(success(workers) && success(serverList));
|
||||
ASSERT(!serverList.get().more && serverList.get().size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
std::map<Optional<Standalone<StringRef>>, ProcessData> id_data;
|
||||
for (int i = 0; i < workers.get().size(); i++)
|
||||
id_data[workers.get()[i].locality.processId()] = workers.get()[i];
|
||||
|
||||
for (int i = 0; i < serverList.get().size(); i++) {
|
||||
auto ssi = decodeServerListValue(serverList.get()[i].value);
|
||||
if (!ssi.isTss()) {
|
||||
result->allServers.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
|
||||
server_dc[ssi.id()] = ssi.locality.dcId();
|
||||
} else {
|
||||
tss_servers.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
|
||||
}
|
||||
}
|
||||
|
||||
RangeResult dms = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!dms.more && dms.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
for (int i = 0; i < dms.size(); ++i) {
|
||||
auto dataMove = std::make_shared<DataMove>(decodeDataMoveValue(dms[i].value), true);
|
||||
const DataMoveMetaData& meta = dataMove->meta;
|
||||
for (const UID& id : meta.src) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
||||
dataMove->remoteSrc.push_back(id);
|
||||
} else {
|
||||
dataMove->primarySrc.push_back(id);
|
||||
}
|
||||
}
|
||||
for (const UID& id : meta.dest) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
||||
dataMove->remoteDest.push_back(id);
|
||||
} else {
|
||||
dataMove->primaryDest.push_back(id);
|
||||
}
|
||||
}
|
||||
std::sort(dataMove->primarySrc.begin(), dataMove->primarySrc.end());
|
||||
std::sort(dataMove->remoteSrc.begin(), dataMove->remoteSrc.end());
|
||||
std::sort(dataMove->primaryDest.begin(), dataMove->primaryDest.end());
|
||||
std::sort(dataMove->remoteDest.begin(), dataMove->remoteDest.end());
|
||||
|
||||
auto ranges = result->dataMoveMap.intersectingRanges(meta.range);
|
||||
for (auto& r : ranges) {
|
||||
ASSERT(!r.value()->valid);
|
||||
}
|
||||
result->dataMoveMap.insert(meta.range, std::move(dataMove));
|
||||
++numDataMoves;
|
||||
}
|
||||
|
||||
succeeded = true;
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
|
||||
ASSERT(!succeeded); // We shouldn't be retrying if we have already started modifying result in this loop
|
||||
TraceEvent("GetInitialTeamsRetry", distributorId).log();
|
||||
}
|
||||
}
|
||||
|
||||
// If keyServers is too large to read in a single transaction, then we will have to break this process up into
|
||||
// multiple transactions. In that case, each iteration should begin where the previous left off
|
||||
while (beginKey < allKeys.end) {
|
||||
TEST(beginKey > allKeys.begin); // Multi-transactional getInitialDataDistribution
|
||||
loop {
|
||||
succeeded = false;
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
wait(checkMoveKeysLockReadOnly(&tr, moveKeysLock, ddEnabledState));
|
||||
state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
RangeResult keyServers = wait(krmGetRanges(&tr,
|
||||
keyServersPrefix,
|
||||
KeyRangeRef(beginKey, allKeys.end),
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
||||
succeeded = true;
|
||||
|
||||
std::vector<UID> src, dest, last;
|
||||
UID srcId, destId;
|
||||
|
||||
// for each range
|
||||
for (int i = 0; i < keyServers.size() - 1; i++) {
|
||||
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest, srcId, destId);
|
||||
DDShardInfo info(keyServers[i].key, srcId, destId);
|
||||
if (remoteDcIds.size()) {
|
||||
auto srcIter = team_cache.find(src);
|
||||
if (srcIter == team_cache.end()) {
|
||||
for (auto& id : src) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
||||
info.remoteSrc.push_back(id);
|
||||
} else {
|
||||
info.primarySrc.push_back(id);
|
||||
}
|
||||
}
|
||||
result->primaryTeams.insert(info.primarySrc);
|
||||
result->remoteTeams.insert(info.remoteSrc);
|
||||
team_cache[src] = std::make_pair(info.primarySrc, info.remoteSrc);
|
||||
} else {
|
||||
info.primarySrc = srcIter->second.first;
|
||||
info.remoteSrc = srcIter->second.second;
|
||||
}
|
||||
if (dest.size()) {
|
||||
info.hasDest = true;
|
||||
auto destIter = team_cache.find(dest);
|
||||
if (destIter == team_cache.end()) {
|
||||
for (auto& id : dest) {
|
||||
auto& dc = server_dc[id];
|
||||
if (std::find(remoteDcIds.begin(), remoteDcIds.end(), dc) != remoteDcIds.end()) {
|
||||
info.remoteDest.push_back(id);
|
||||
} else {
|
||||
info.primaryDest.push_back(id);
|
||||
}
|
||||
}
|
||||
result->primaryTeams.insert(info.primaryDest);
|
||||
result->remoteTeams.insert(info.remoteDest);
|
||||
team_cache[dest] = std::make_pair(info.primaryDest, info.remoteDest);
|
||||
} else {
|
||||
info.primaryDest = destIter->second.first;
|
||||
info.remoteDest = destIter->second.second;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info.primarySrc = src;
|
||||
auto srcIter = team_cache.find(src);
|
||||
if (srcIter == team_cache.end()) {
|
||||
result->primaryTeams.insert(src);
|
||||
team_cache[src] = std::pair<std::vector<UID>, std::vector<UID>>();
|
||||
}
|
||||
if (dest.size()) {
|
||||
info.hasDest = true;
|
||||
info.primaryDest = dest;
|
||||
auto destIter = team_cache.find(dest);
|
||||
if (destIter == team_cache.end()) {
|
||||
result->primaryTeams.insert(dest);
|
||||
team_cache[dest] = std::pair<std::vector<UID>, std::vector<UID>>();
|
||||
}
|
||||
}
|
||||
}
|
||||
result->shards.push_back(info);
|
||||
}
|
||||
|
||||
ASSERT_GT(keyServers.size(), 0);
|
||||
beginKey = keyServers.end()[-1].key;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GetInitialTeamsKeyServersRetry", distributorId).error(e);
|
||||
|
||||
wait(tr.onError(e));
|
||||
ASSERT(!succeeded); // We shouldn't be retrying if we have already started modifying result in this loop
|
||||
}
|
||||
}
|
||||
|
||||
tr.reset();
|
||||
}
|
||||
|
||||
// a dummy shard at the end with no keys or servers makes life easier for trackInitialShards()
|
||||
result->shards.push_back(DDShardInfo(allKeys.end));
|
||||
|
||||
if (CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA && numDataMoves > 0) {
|
||||
for (int shard = 0; shard < result->shards.size() - 1; ++shard) {
|
||||
const DDShardInfo& iShard = result->shards[shard];
|
||||
KeyRangeRef keys = KeyRangeRef(iShard.key, result->shards[shard + 1].key);
|
||||
result->dataMoveMap[keys.begin]->validateShard(iShard, keys);
|
||||
}
|
||||
}
|
||||
|
||||
// add tss to server list AFTER teams are built
|
||||
for (auto& it : tss_servers) {
|
||||
result->allServers.push_back(it);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// add server to wiggling queue
|
||||
void StorageWiggler::addServer(const UID& serverId, const StorageMetadataType& metadata) {
|
||||
// std::cout << "size: " << pq_handles.size() << " add " << serverId.toString() << " DC: "
|
||||
|
@ -537,6 +301,7 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
|
|||
DatabaseConfiguration configuration;
|
||||
std::vector<Optional<Key>> primaryDcId;
|
||||
std::vector<Optional<Key>> remoteDcIds;
|
||||
Reference<InitialDataDistribution> initData;
|
||||
|
||||
Reference<EventCacheHolder> initialDDEventHolder;
|
||||
Reference<EventCacheHolder> movingDataEventHolder;
|
||||
|
@ -559,6 +324,16 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
|
|||
Future<Void> updateReplicaKeys() {
|
||||
return txnProcessor->updateReplicaKeys(primaryDcId, remoteDcIds, configuration);
|
||||
}
|
||||
|
||||
Future<Void> loadInitialDataDistribution(const DDEnabledState* ddEnabledState) {
|
||||
return store(initData,
|
||||
txnProcessor->getInitialDataDistribution(
|
||||
ddId,
|
||||
lock,
|
||||
configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(),
|
||||
ddEnabledState));
|
||||
}
|
||||
|
||||
void initDcInfo() {
|
||||
primaryDcId.clear();
|
||||
remoteDcIds.clear();
|
||||
|
@ -591,7 +366,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
|
||||
// wait(debugCheckCoalescing(cx));
|
||||
// FIXME: wrap the bootstrap process into class DataDistributor
|
||||
state Reference<InitialDataDistribution> initData;
|
||||
state Reference<DDTeamCollection> primaryTeamCollection;
|
||||
state Reference<DDTeamCollection> remoteTeamCollection;
|
||||
state bool trackerCancelled;
|
||||
|
@ -616,19 +390,14 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
wait(self->updateReplicaKeys());
|
||||
TraceEvent("DDInitUpdatedReplicaKeys", self->ddId).log();
|
||||
|
||||
Reference<InitialDataDistribution> initData_ = wait(getInitialDataDistribution(
|
||||
cx,
|
||||
self->ddId,
|
||||
self->lock,
|
||||
self->configuration.usableRegions > 1 ? self->remoteDcIds : std::vector<Optional<Key>>(),
|
||||
ddEnabledState));
|
||||
initData = initData_;
|
||||
if (initData->shards.size() > 1) {
|
||||
wait(self->loadInitialDataDistribution(ddEnabledState));
|
||||
|
||||
if (self->initData->shards.size() > 1) {
|
||||
TraceEvent("DDInitGotInitialDD", self->ddId)
|
||||
.detail("B", initData->shards.end()[-2].key)
|
||||
.detail("E", initData->shards.end()[-1].key)
|
||||
.detail("Src", describe(initData->shards.end()[-2].primarySrc))
|
||||
.detail("Dest", describe(initData->shards.end()[-2].primaryDest))
|
||||
.detail("B", self->initData->shards.end()[-2].key)
|
||||
.detail("E", self->initData->shards.end()[-1].key)
|
||||
.detail("Src", describe(self->initData->shards.end()[-2].primarySrc))
|
||||
.detail("Dest", describe(self->initData->shards.end()[-2].primaryDest))
|
||||
.trackLatest(self->initialDDEventHolder->trackingKey);
|
||||
} else {
|
||||
TraceEvent("DDInitGotInitialDD", self->ddId)
|
||||
|
@ -639,7 +408,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
.trackLatest(self->initialDDEventHolder->trackingKey);
|
||||
}
|
||||
|
||||
if (initData->mode && ddEnabledState->isDDEnabled()) {
|
||||
if (self->initData->mode && ddEnabledState->isDDEnabled()) {
|
||||
// mode may be set true by system operator using fdbcli and isDDEnabled() set to true
|
||||
break;
|
||||
}
|
||||
|
@ -705,9 +474,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
state Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure(new ShardsAffectedByTeamFailure);
|
||||
|
||||
state int shard = 0;
|
||||
for (; shard < initData->shards.size() - 1; shard++) {
|
||||
const DDShardInfo& iShard = initData->shards[shard];
|
||||
KeyRangeRef keys = KeyRangeRef(iShard.key, initData->shards[shard + 1].key);
|
||||
for (; shard < self->initData->shards.size() - 1; shard++) {
|
||||
const DDShardInfo& iShard = self->initData->shards[shard];
|
||||
KeyRangeRef keys = KeyRangeRef(iShard.key, self->initData->shards[shard + 1].key);
|
||||
|
||||
shardsAffectedByTeamFailure->defineShard(keys);
|
||||
std::vector<ShardsAffectedByTeamFailure::Team> teams;
|
||||
|
@ -744,8 +513,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
wait(yield(TaskPriority::DataDistribution));
|
||||
}
|
||||
|
||||
state KeyRangeMap<std::shared_ptr<DataMove>>::iterator it = initData->dataMoveMap.ranges().begin();
|
||||
for (; it != initData->dataMoveMap.ranges().end(); ++it) {
|
||||
state KeyRangeMap<std::shared_ptr<DataMove>>::iterator it = self->initData->dataMoveMap.ranges().begin();
|
||||
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
|
||||
const DataMoveMetaData& meta = it.value()->meta;
|
||||
if (it.value()->isCancelled() || (it.value()->valid && !CLIENT_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {
|
||||
RelocateShard rs(meta.range, SERVER_KNOBS->PRIORITY_RECOVER_MOVE, RelocateReason::OTHER);
|
||||
|
@ -804,7 +573,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
}
|
||||
|
||||
actors.push_back(pollMoveKeysLock(cx, self->lock, ddEnabledState));
|
||||
actors.push_back(reportErrorsExcept(dataDistributionTracker(initData,
|
||||
actors.push_back(reportErrorsExcept(dataDistributionTracker(self->initData,
|
||||
cx,
|
||||
output,
|
||||
shardsAffectedByTeamFailure,
|
||||
|
@ -882,7 +651,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
|
||||
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
|
||||
actors.push_back(reportErrorsExcept(
|
||||
DDTeamCollection::run(remoteTeamCollection, initData, tcis[1], recruitStorage, *ddEnabledState),
|
||||
DDTeamCollection::run(
|
||||
remoteTeamCollection, self->initData, tcis[1], recruitStorage, *ddEnabledState),
|
||||
"DDTeamCollectionSecondary",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
|
@ -891,7 +661,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
primaryTeamCollection->teamCollections = teamCollectionsPtrs;
|
||||
self->teamCollection = primaryTeamCollection.getPtr();
|
||||
actors.push_back(reportErrorsExcept(
|
||||
DDTeamCollection::run(primaryTeamCollection, initData, tcis[0], recruitStorage, *ddEnabledState),
|
||||
DDTeamCollection::run(primaryTeamCollection, self->initData, tcis[0], recruitStorage, *ddEnabledState),
|
||||
"DDTeamCollectionPrimary",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
|
|
|
@ -24,6 +24,8 @@
|
|||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
|
||||
struct InitialDataDistribution;
|
||||
|
||||
/* Testability Contract:
|
||||
* a. The DataDistributor has to use this interface to interact with data-plane (aka. run transaction), because the
|
||||
* testability benefits from a mock implementation; b. Other control-plane roles should consider providing its own
|
||||
|
@ -40,6 +42,12 @@ public:
|
|||
// get the storage server list and Process class
|
||||
virtual Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses() = 0;
|
||||
|
||||
virtual Future<Reference<InitialDataDistribution>> getInitialDataDistribution(
|
||||
const UID& distributorId,
|
||||
const MoveKeysLock& moveKeysLock,
|
||||
const std::vector<Optional<Key>>& remoteDcIds,
|
||||
const DDEnabledState* ddEnabledState) = 0;
|
||||
|
||||
virtual ~IDDTxnProcessor() = default;
|
||||
|
||||
[[nodiscard]] virtual Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const { return MoveKeysLock(); }
|
||||
|
@ -71,6 +79,12 @@ public:
|
|||
// Call NativeAPI implementation directly
|
||||
Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses() override;
|
||||
|
||||
Future<Reference<InitialDataDistribution>> getInitialDataDistribution(
|
||||
const UID& distributorId,
|
||||
const MoveKeysLock& moveKeysLock,
|
||||
const std::vector<Optional<Key>>& remoteDcIds,
|
||||
const DDEnabledState* ddEnabledState) override;
|
||||
|
||||
Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const override;
|
||||
|
||||
Future<DatabaseConfiguration> getDatabaseConfiguration() const override;
|
||||
|
|
Loading…
Reference in New Issue