extract startMoveKeysTransaction method

This commit is contained in:
Xiaoxi Wang 2022-10-10 16:04:43 -07:00
parent 914dfd7438
commit e959bdef8d
5 changed files with 261 additions and 172 deletions

View File

@ -681,6 +681,36 @@ Future<std::vector<ProcessData>> DDTxnProcessor::getWorkers() const {
return ::getWorkers(cx);
}
Future<Void> DDTxnProcessor::rawStartMovement(MoveKeysParams& params,
std::map<UID, StorageServerInterface>& tssMapping) {
UNREACHABLE();
}
Future<Void> DDTxnProcessor::rawFinishMovement(MoveKeysParams& params,
const std::map<UID, StorageServerInterface>& tssMapping) {
UNREACHABLE();
}
struct DDMockTxnProcessorImpl {
ACTOR static Future<Void> moveKeys(DDMockTxnProcessor* self, MoveKeysParams params) {
state std::map<UID, StorageServerInterface> tssMapping; // Not used at all
Future<Void> startF = self->rawStartMovement(params, tssMapping);
ASSERT(startF.isReady());
ASSERT(tssMapping.empty());
if (BUGGIFY_WITH_PROB(0.5)) {
wait(delayJittered(5.0));
}
Future<Void> finishF = self->rawFinishMovement(params, tssMapping);
ASSERT(finishF.isReady());
if (!params.dataMovementComplete.isSet())
params.dataMovementComplete.send(Void());
return Void();
}
};
Future<ServerWorkerInfos> DDMockTxnProcessor::getServerListAndProcessClasses() {
ServerWorkerInfos res;
for (auto& [_, mss] : mgs->allServers) {
@ -819,7 +849,9 @@ void DDMockTxnProcessor::setupMockGlobalState(Reference<InitialDataDistribution>
// FIXME: finish moveKeys implementation
Future<Void> DDMockTxnProcessor::moveKeys(const MoveKeysParams& params) {
UNREACHABLE();
// Not support location metadata yet
ASSERT(!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
return DDMockTxnProcessorImpl::moveKeys(this, params);
}
// FIXME: finish implementation
@ -851,3 +883,22 @@ Future<std::pair<Optional<StorageMetrics>, int>> DDMockTxnProcessor::waitStorage
Future<std::vector<ProcessData>> DDMockTxnProcessor::getWorkers() const {
return Future<std::vector<ProcessData>>();
}
Future<Void> DDMockTxnProcessor::rawStartMovement(MoveKeysParams& params,
std::map<UID, StorageServerInterface>& tssMapping) {
std::vector<ShardsAffectedByTeamFailure::Team> destTeams;
destTeams.emplace_back(params.destinationTeam, true);
mgs->shardMapping->moveShard(params.keys, destTeams);
for (auto& id : params.destinationTeam) {
mgs->allServers.at(id).setShardStatus(params.keys, MockShardStatus::INFLIGHT);
}
return Void();
}
Future<Void> DDMockTxnProcessor::rawFinishMovement(MoveKeysParams& params,
const std::map<UID, StorageServerInterface>& tssMapping) {
// get source and dest teams
auto [destTeams, srcTeams] = mgs->shardMapping->getTeamsFor(params.keys);
return Void();
}

View File

@ -31,6 +31,8 @@ bool MockStorageServer::allShardStatusEqual(KeyRangeRef range, MockShardStatus s
return true;
}
void MockStorageServer::setShardStatus(KeyRangeRef range, MockShardStatus status) {}
void MockGlobalState::initializeAsEmptyDatabaseMGS(const DatabaseConfiguration& conf, uint64_t defaultDiskSpace) {
ASSERT(conf.storageTeamSize > 0);
configuration = conf;

View File

@ -578,20 +578,191 @@ ACTOR Future<Void> logWarningAfter(const char* context, double duration, std::ve
}
}
struct MoveKeysBatchInfo {
Key batchEnd;
int batchShards = 0;
};
// keyServer: map from keys to destination servers
// serverKeys: two-dimension map: [servers][keys], value is the servers' state of having the keys: active(not-have),
// complete(already has), ""(). Set keyServers[keys].dest = servers Set serverKeys[servers][keys] = active for each
// subrange of keys that the server did not already have, complete for each subrange that it already has Set
// serverKeys[dest][keys] = "" for the dest servers of each existing shard in keys (unless that destination is a member
// of servers OR if the source list is sufficiently degraded)
ACTOR static Future<MoveKeysBatchInfo> startMoveKeysTransaction(Database occ,
KeyRange keys,
std::vector<UID>* servers,
MoveKeysLock lock,
UID relocationIntervalId,
std::map<UID, StorageServerInterface>* tssMapping,
const DDEnabledState* ddEnabledState,
bool loadedTssMapping,
Key begin,
int shards) {
state int retries = 0; // RYW to optimize re-reading the same key ranges
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(occ);
loop {
try {
retries++;
// Keep track of old dests that may need to have ranges removed from serverKeys
state std::__1::set<UID> oldDests;
// Keep track of shards for all src servers so that we can preserve their values in serverKeys
state Map<UID, VectorRef<KeyRangeRef>> shardMap;
tr->getTransaction().trState->taskID = TaskPriority::MoveKeys;
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(checkMoveKeysLock(&(tr->getTransaction()), lock, ddEnabledState));
if (!loadedTssMapping) {
// share transaction for loading tss mapping with the rest of start move keys
wait(readTSSMappingRYW(tr, tssMapping));
loadedTssMapping = true;
}
std::__1::vector<Future<Optional<Value>>> serverListEntries;
serverListEntries.reserve(servers->size());
for (int s = 0; s < servers->size(); s++)
serverListEntries.push_back(tr->get(serverListKeyFor(servers->at(s))));
state std::__1::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
for (int s = 0; s < serverListValues.size(); s++) {
if (!serverListValues[s].present()) {
// Attempt to move onto a server that isn't in serverList (removed or never added to the
// database) This can happen (why?) and is handled by the data distribution algorithm
// FIXME: Answer why this can happen?
CODE_PROBE(true, "start move keys moving to a removed server", probe::decoration::rare);
throw move_to_removed_server();
}
}
// Get all existing shards overlapping keys (exclude any that have been processed in a previous
// iteration of the outer loop)
state KeyRange currentKeys = KeyRangeRef(begin, keys.end);
state RangeResult old = wait(krmGetRanges(tr,
keyServersPrefix,
currentKeys,
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
// Determine the last processed key (which will be the beginning for the next iteration)
state Key endKey = old.end()[-1].key;
currentKeys = KeyRangeRef(currentKeys.begin, endKey);
// TraceEvent("StartMoveKeysBatch", relocationIntervalId)
// .detail("KeyBegin", currentKeys.begin.toString())
// .detail("KeyEnd", currentKeys.end.toString());
// printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(),
// keys.end.toString().c_str(), old.size(), servers.size()); for(int i=0; i<old.size(); i++)
// printf("'%s': '%s'\n", old[i].key.toString().c_str(), old[i].value.toString().c_str());
// Check that enough servers for each shard are in the correct state
state RangeResult UIDtoTagMap = wait(tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
std::__1::vector<std::__1::vector<UID>> addAsSource = wait(additionalSources(
old, tr, servers->size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER * servers->size()));
// For each intersecting range, update keyServers[range] dest to be servers and clear existing dest
// servers from serverKeys
for (int i = 0; i < old.size() - 1; ++i) {
KeyRangeRef rangeIntersectKeys(old[i].key, old[i + 1].key);
std::__1::vector<UID> src;
std::__1::vector<UID> dest;
decodeKeyServersValue(UIDtoTagMap, old[i].value, src, dest);
// TraceEvent("StartMoveKeysOldRange", relocationIntervalId)
// .detail("KeyBegin", rangeIntersectKeys.begin.toString())
// .detail("KeyEnd", rangeIntersectKeys.end.toString())
// .detail("OldSrc", describe(src))
// .detail("OldDest", describe(dest))
// .detail("ReadVersion", tr->getReadVersion().get());
for (auto& uid : addAsSource[i]) {
src.push_back(uid);
}
uniquify(src);
// Update dest servers for this range to be equal to servers
krmSetPreviouslyEmptyRange(&(tr->getTransaction()),
keyServersPrefix,
rangeIntersectKeys,
keyServersValue(UIDtoTagMap, src, *servers),
old[i + 1].value);
// Track old destination servers. They may be removed from serverKeys soon, since they are
// about to be overwritten in keyServers
for (auto s = dest.begin(); s != dest.end(); ++s) {
oldDests.insert(*s);
// TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s);
}
// Keep track of src shards so that we can preserve their values when we overwrite serverKeys
for (auto& uid : src) {
shardMap[uid].push_back(old.arena(), rangeIntersectKeys);
// TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid);
}
}
state std::__1::set<UID>::iterator oldDest;
// Remove old dests from serverKeys. In order for krmSetRangeCoalescing to work correctly in the
// same prefix for a single transaction, we must do most of the coalescing ourselves. Only the
// shards on the boundary of currentRange are actually coalesced with the ranges outside of
// currentRange. For all shards internal to currentRange, we overwrite all consecutive keys whose
// value is or should be serverKeysFalse in a single write
std::__1::vector<Future<Void>> actors;
for (oldDest = oldDests.begin(); oldDest != oldDests.end(); ++oldDest)
if (std::__1::find(servers->begin(), servers->end(), *oldDest) == servers->end())
actors.push_back(removeOldDestinations(tr, *oldDest, shardMap[*oldDest], currentKeys));
// Update serverKeys to include keys (or the currently processed subset of keys) for each SS in
// servers
for (int i = 0; i < servers->size(); i++) {
// Since we are setting this for the entire range, serverKeys and keyServers aren't guaranteed
// to have the same shard boundaries If that invariant was important, we would have to move this
// inside the loop above and also set it for the src servers
actors.push_back(krmSetRangeCoalescing(
tr, serverKeysPrefixFor(servers->at(i)), currentKeys, allKeys, serverKeysTrue));
}
wait(waitForAll(actors));
wait(tr->commit());
/*TraceEvent("StartMoveKeysCommitDone", relocationIntervalId)
.detail("CommitVersion", tr.getCommittedVersion())
.detail("ShardsInBatch", old.size() - 1);*/
return MoveKeysBatchInfo{ endKey, old.size() - 1 };
} catch (Error& e) {
state Error err = e;
if (err.code() == error_code_move_to_removed_server)
throw;
wait(tr->onError(e));
if (retries % 10 == 0) {
TraceEvent(retries == 50 ? SevWarnAlways : SevWarn, "StartMoveKeysRetrying", relocationIntervalId)
.error(err)
.detail("Keys", keys)
.detail("BeginKey", begin)
.detail("NumTries", retries);
}
}
}
}
ACTOR static Future<Void> startMoveKeys(Database occ,
KeyRange keys,
std::vector<UID> servers,
MoveKeysLock lock,
FlowLock* startMoveKeysLock,
UID relocationIntervalId,
std::map<UID, StorageServerInterface>* tssMapping,
const DDEnabledState* ddEnabledState) {
KeyRange keys,
std::vector<UID> servers,
MoveKeysLock lock,
FlowLock* startMoveKeysLock,
UID relocationIntervalId,
std::map<UID, StorageServerInterface>* tssMapping,
const DDEnabledState* ddEnabledState) {
state TraceInterval interval("RelocateShard_StartMoveKeys");
state Future<Void> warningLogger = logWarningAfter("StartMoveKeysTooLong", 600, servers);
// state TraceInterval waitInterval("");
@ -614,170 +785,18 @@ ACTOR static Future<Void> startMoveKeys(Database occ,
while (begin < keys.end) {
CODE_PROBE(begin > keys.begin, "Multi-transactional startMoveKeys");
batches++;
// RYW to optimize re-reading the same key ranges
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(occ);
state int retries = 0;
loop {
try {
retries++;
// Keep track of old dests that may need to have ranges removed from serverKeys
state std::set<UID> oldDests;
// Keep track of shards for all src servers so that we can preserve their values in serverKeys
state Map<UID, VectorRef<KeyRangeRef>> shardMap;
tr->getTransaction().trState->taskID = TaskPriority::MoveKeys;
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(checkMoveKeysLock(&(tr->getTransaction()), lock, ddEnabledState));
if (!loadedTssMapping) {
// share transaction for loading tss mapping with the rest of start move keys
wait(readTSSMappingRYW(tr, tssMapping));
loadedTssMapping = true;
}
std::vector<Future<Optional<Value>>> serverListEntries;
serverListEntries.reserve(servers.size());
for (int s = 0; s < servers.size(); s++)
serverListEntries.push_back(tr->get(serverListKeyFor(servers[s])));
state std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
for (int s = 0; s < serverListValues.size(); s++) {
if (!serverListValues[s].present()) {
// Attempt to move onto a server that isn't in serverList (removed or never added to the
// database) This can happen (why?) and is handled by the data distribution algorithm
// FIXME: Answer why this can happen?
CODE_PROBE(true, "start move keys moving to a removed server", probe::decoration::rare);
throw move_to_removed_server();
}
}
// Get all existing shards overlapping keys (exclude any that have been processed in a previous
// iteration of the outer loop)
state KeyRange currentKeys = KeyRangeRef(begin, keys.end);
state RangeResult old = wait(krmGetRanges(tr,
keyServersPrefix,
currentKeys,
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
// Determine the last processed key (which will be the beginning for the next iteration)
state Key endKey = old.end()[-1].key;
currentKeys = KeyRangeRef(currentKeys.begin, endKey);
// TraceEvent("StartMoveKeysBatch", relocationIntervalId)
// .detail("KeyBegin", currentKeys.begin.toString())
// .detail("KeyEnd", currentKeys.end.toString());
// printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(),
// keys.end.toString().c_str(), old.size(), servers.size()); for(int i=0; i<old.size(); i++)
// printf("'%s': '%s'\n", old[i].key.toString().c_str(), old[i].value.toString().c_str());
// Check that enough servers for each shard are in the correct state
state RangeResult UIDtoTagMap = wait(tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
std::vector<std::vector<UID>> addAsSource = wait(additionalSources(
old, tr, servers.size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER * servers.size()));
// For each intersecting range, update keyServers[range] dest to be servers and clear existing dest
// servers from serverKeys
for (int i = 0; i < old.size() - 1; ++i) {
KeyRangeRef rangeIntersectKeys(old[i].key, old[i + 1].key);
std::vector<UID> src;
std::vector<UID> dest;
decodeKeyServersValue(UIDtoTagMap, old[i].value, src, dest);
// TraceEvent("StartMoveKeysOldRange", relocationIntervalId)
// .detail("KeyBegin", rangeIntersectKeys.begin.toString())
// .detail("KeyEnd", rangeIntersectKeys.end.toString())
// .detail("OldSrc", describe(src))
// .detail("OldDest", describe(dest))
// .detail("ReadVersion", tr->getReadVersion().get());
for (auto& uid : addAsSource[i]) {
src.push_back(uid);
}
uniquify(src);
// Update dest servers for this range to be equal to servers
krmSetPreviouslyEmptyRange(&(tr->getTransaction()),
keyServersPrefix,
rangeIntersectKeys,
keyServersValue(UIDtoTagMap, src, servers),
old[i + 1].value);
// Track old destination servers. They may be removed from serverKeys soon, since they are
// about to be overwritten in keyServers
for (auto s = dest.begin(); s != dest.end(); ++s) {
oldDests.insert(*s);
// TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s);
}
// Keep track of src shards so that we can preserve their values when we overwrite serverKeys
for (auto& uid : src) {
shardMap[uid].push_back(old.arena(), rangeIntersectKeys);
// TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid);
}
}
state std::set<UID>::iterator oldDest;
// Remove old dests from serverKeys. In order for krmSetRangeCoalescing to work correctly in the
// same prefix for a single transaction, we must do most of the coalescing ourselves. Only the
// shards on the boundary of currentRange are actually coalesced with the ranges outside of
// currentRange. For all shards internal to currentRange, we overwrite all consecutive keys whose
// value is or should be serverKeysFalse in a single write
std::vector<Future<Void>> actors;
for (oldDest = oldDests.begin(); oldDest != oldDests.end(); ++oldDest)
if (std::find(servers.begin(), servers.end(), *oldDest) == servers.end())
actors.push_back(removeOldDestinations(tr, *oldDest, shardMap[*oldDest], currentKeys));
// Update serverKeys to include keys (or the currently processed subset of keys) for each SS in
// servers
for (int i = 0; i < servers.size(); i++) {
// Since we are setting this for the entire range, serverKeys and keyServers aren't guaranteed
// to have the same shard boundaries If that invariant was important, we would have to move this
// inside the loop above and also set it for the src servers
actors.push_back(krmSetRangeCoalescing(
tr, serverKeysPrefixFor(servers[i]), currentKeys, allKeys, serverKeysTrue));
}
wait(waitForAll(actors));
wait(tr->commit());
/*TraceEvent("StartMoveKeysCommitDone", relocationIntervalId)
.detail("CommitVersion", tr.getCommittedVersion())
.detail("ShardsInBatch", old.size() - 1);*/
begin = endKey;
shards += old.size() - 1;
break;
} catch (Error& e) {
state Error err = e;
if (err.code() == error_code_move_to_removed_server)
throw;
wait(tr->onError(e));
if (retries % 10 == 0) {
TraceEvent(
retries == 50 ? SevWarnAlways : SevWarn, "StartMoveKeysRetrying", relocationIntervalId)
.error(err)
.detail("Keys", keys)
.detail("BeginKey", begin)
.detail("NumTries", retries);
}
}
}
if (retries > maxRetries) {
maxRetries = retries;
}
MoveKeysBatchInfo batchInfo = wait(startMoveKeysTransaction(occ,
keys,
&servers,
lock,
relocationIntervalId,
tssMapping,
ddEnabledState,
loadedTssMapping,
begin,
shards));
shards += batchInfo.batchShards;
begin = batchInfo.batchEnd;
}
// printf("Committed moving '%s'-'%s' (version %lld)\n", keys.begin.toString().c_str(),

View File

@ -112,6 +112,11 @@ public:
const MoveKeysLock& lock,
const DDEnabledState* ddEnabledState) const = 0;
virtual Future<Void> rawStartMovement(MoveKeysParams& params, std::map<UID, StorageServerInterface>& tssMapping) = 0;
virtual Future<Void> rawFinishMovement(MoveKeysParams& params,
const std::map<UID, StorageServerInterface>& tssMapping) = 0;
virtual Future<Void> moveKeys(const MoveKeysParams& params) = 0;
virtual Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(KeyRange const& keys,
@ -198,6 +203,11 @@ public:
return ::removeStorageServer(cx, serverID, tssPairID, lock, ddEnabledState);
}
Future<Void> rawStartMovement(MoveKeysParams& params, std::map<UID, StorageServerInterface>& tssMapping) override;
Future<Void> rawFinishMovement(MoveKeysParams& params,
const std::map<UID, StorageServerInterface>& tssMapping) override;
Future<Void> moveKeys(const MoveKeysParams& params) override { return ::moveKeys(cx, params); }
Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(KeyRange const& keys,
@ -257,6 +267,11 @@ public:
// test only
void setupMockGlobalState(Reference<InitialDataDistribution> initData);
Future<Void> rawStartMovement(MoveKeysParams& params, std::map<UID, StorageServerInterface>& tssMapping) override;
Future<Void> rawFinishMovement(MoveKeysParams& params,
const std::map<UID, StorageServerInterface>& tssMapping) override;
Future<Void> moveKeys(const MoveKeysParams& params) override;
Database context() const override { UNREACHABLE(); };

View File

@ -73,6 +73,8 @@ public:
decltype(serverKeys)::Ranges getAllRanges() { return serverKeys.ranges(); }
bool allShardStatusEqual(KeyRangeRef range, MockShardStatus status);
void setShardStatus(KeyRangeRef range, MockShardStatus status);
};
class MockGlobalState {