Revert "extract startMoveKeysTransaction method"

This reverts commit 014398a5a0d9b3442a5c626197f1d65ff6c177e3.
This commit is contained in:
Xiaoxi Wang 2022-10-15 23:05:52 -07:00
parent 885f8242d9
commit d7a408928a
1 changed files with 164 additions and 183 deletions

View File

@ -578,183 +578,12 @@ ACTOR Future<Void> logWarningAfter(const char* context, double duration, std::ve
struct MoveKeysBatchInfo {
Key batchEnd;
int batchShards = 0;
// keyServer: map from keys to destination servers
// serverKeys: two-dimension map: [servers][keys], value is the servers' state of having the keys: active(not-have),
// complete(already has), ""(). Set keyServers[keys].dest = servers Set serverKeys[servers][keys] = active for each
// subrange of keys that the server did not already have, complete for each subrange that it already has Set
// serverKeys[dest][keys] = "" for the dest servers of each existing shard in keys (unless that destination is a member
// of servers OR if the source list is sufficiently degraded)
ACTOR static Future<MoveKeysBatchInfo> startMoveKeysTransaction(Database occ,
KeyRange keys,
std::vector<UID>* servers,
MoveKeysLock lock,
UID relocationIntervalId,
std::map<UID, StorageServerInterface>* tssMapping,
const DDEnabledState* ddEnabledState,
bool loadedTssMapping,
Key begin,
int shards) {
state int retries = 0; // RYW to optimize re-reading the same key ranges
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(occ);
loop {
try {
// Keep track of old dests that may need to have ranges removed from serverKeys
state std::__1::set<UID> oldDests;
// Keep track of shards for all src servers so that we can preserve their values in serverKeys
state Map<UID, VectorRef<KeyRangeRef>> shardMap;
tr->getTransaction().trState->taskID = TaskPriority::MoveKeys;
wait(checkMoveKeysLock(&(tr->getTransaction()), lock, ddEnabledState));
if (!loadedTssMapping) {
// share transaction for loading tss mapping with the rest of start move keys
wait(readTSSMappingRYW(tr, tssMapping));
loadedTssMapping = true;
std::__1::vector<Future<Optional<Value>>> serverListEntries;
for (int s = 0; s < servers->size(); s++)
state std::__1::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
for (int s = 0; s < serverListValues.size(); s++) {
if (!serverListValues[s].present()) {
// Attempt to move onto a server that isn't in serverList (removed or never added to the
// database) This can happen (why?) and is handled by the data distribution algorithm
// FIXME: Answer why this can happen?
CODE_PROBE(true, "start move keys moving to a removed server", probe::decoration::rare);
throw move_to_removed_server();
// Get all existing shards overlapping keys (exclude any that have been processed in a previous
// iteration of the outer loop)
state KeyRange currentKeys = KeyRangeRef(begin, keys.end);
state RangeResult old = wait(krmGetRanges(tr,
// Determine the last processed key (which will be the beginning for the next iteration)
state Key endKey = old.end()[-1].key;
currentKeys = KeyRangeRef(currentKeys.begin, endKey);
// TraceEvent("StartMoveKeysBatch", relocationIntervalId)
// .detail("KeyBegin", currentKeys.begin.toString())
// .detail("KeyEnd", currentKeys.end.toString());
// printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(),
// keys.end.toString().c_str(), old.size(), servers.size()); for(int i=0; i<old.size(); i++)
// printf("'%s': '%s'\n", old[i].key.toString().c_str(), old[i].value.toString().c_str());
// Check that enough servers for each shard are in the correct state
state RangeResult UIDtoTagMap = wait(tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
std::__1::vector<std::__1::vector<UID>> addAsSource = wait(additionalSources(
old, tr, servers->size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER * servers->size()));
// For each intersecting range, update keyServers[range] dest to be servers and clear existing dest
// servers from serverKeys
for (int i = 0; i < old.size() - 1; ++i) {
KeyRangeRef rangeIntersectKeys(old[i].key, old[i + 1].key);
std::__1::vector<UID> src;
std::__1::vector<UID> dest;
decodeKeyServersValue(UIDtoTagMap, old[i].value, src, dest);
// TraceEvent("StartMoveKeysOldRange", relocationIntervalId)
// .detail("KeyBegin", rangeIntersectKeys.begin.toString())
// .detail("KeyEnd", rangeIntersectKeys.end.toString())
// .detail("OldSrc", describe(src))
// .detail("OldDest", describe(dest))
// .detail("ReadVersion", tr->getReadVersion().get());
for (auto& uid : addAsSource[i]) {
// Update dest servers for this range to be equal to servers
keyServersValue(UIDtoTagMap, src, *servers),
old[i + 1].value);
// Track old destination servers. They may be removed from serverKeys soon, since they are
// about to be overwritten in keyServers
for (auto s = dest.begin(); s != dest.end(); ++s) {
// TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s);
// Keep track of src shards so that we can preserve their values when we overwrite serverKeys
for (auto& uid : src) {
shardMap[uid].push_back(old.arena(), rangeIntersectKeys);
// TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid);
state std::__1::set<UID>::iterator oldDest;
// Remove old dests from serverKeys. In order for krmSetRangeCoalescing to work correctly in the
// same prefix for a single transaction, we must do most of the coalescing ourselves. Only the
// shards on the boundary of currentRange are actually coalesced with the ranges outside of
// currentRange. For all shards internal to currentRange, we overwrite all consecutive keys whose
// value is or should be serverKeysFalse in a single write
std::__1::vector<Future<Void>> actors;
for (oldDest = oldDests.begin(); oldDest != oldDests.end(); ++oldDest)
if (std::__1::find(servers->begin(), servers->end(), *oldDest) == servers->end())
actors.push_back(removeOldDestinations(tr, *oldDest, shardMap[*oldDest], currentKeys));
// Update serverKeys to include keys (or the currently processed subset of keys) for each SS in
// servers
for (int i = 0; i < servers->size(); i++) {
// Since we are setting this for the entire range, serverKeys and keyServers aren't guaranteed
// to have the same shard boundaries If that invariant was important, we would have to move this
// inside the loop above and also set it for the src servers
tr, serverKeysPrefixFor(servers->at(i)), currentKeys, allKeys, serverKeysTrue));
/*TraceEvent("StartMoveKeysCommitDone", relocationIntervalId)
.detail("CommitVersion", tr.getCommittedVersion())
.detail("ShardsInBatch", old.size() - 1);*/
return MoveKeysBatchInfo{ endKey, old.size() - 1 };
} catch (Error& e) {
state Error err = e;
if (err.code() == error_code_move_to_removed_server)
if (retries % 10 == 0) {
TraceEvent(retries == 50 ? SevWarnAlways : SevWarn, "StartMoveKeysRetrying", relocationIntervalId)
.detail("Keys", keys)
.detail("BeginKey", begin)
.detail("NumTries", retries);
ACTOR static Future<Void> startMoveKeys(Database occ,
KeyRange keys,
std::vector<UID> servers,
@ -785,18 +614,170 @@ ACTOR static Future<Void> startMoveKeys(Database occ,
while (begin < keys.end) {
CODE_PROBE(begin > keys.begin, "Multi-transactional startMoveKeys");
MoveKeysBatchInfo batchInfo = wait(startMoveKeysTransaction(occ,
shards += batchInfo.batchShards;
begin = batchInfo.batchEnd;
// RYW to optimize re-reading the same key ranges
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(occ);
state int retries = 0;
loop {
try {
// Keep track of old dests that may need to have ranges removed from serverKeys
state std::set<UID> oldDests;
// Keep track of shards for all src servers so that we can preserve their values in serverKeys
state Map<UID, VectorRef<KeyRangeRef>> shardMap;
tr->getTransaction().trState->taskID = TaskPriority::MoveKeys;
wait(checkMoveKeysLock(&(tr->getTransaction()), lock, ddEnabledState));
if (!loadedTssMapping) {
// share transaction for loading tss mapping with the rest of start move keys
wait(readTSSMappingRYW(tr, tssMapping));
loadedTssMapping = true;
std::vector<Future<Optional<Value>>> serverListEntries;
for (int s = 0; s < servers.size(); s++)
state std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
for (int s = 0; s < serverListValues.size(); s++) {
if (!serverListValues[s].present()) {
// Attempt to move onto a server that isn't in serverList (removed or never added to the
// database) This can happen (why?) and is handled by the data distribution algorithm
// FIXME: Answer why this can happen?
CODE_PROBE(true, "start move keys moving to a removed server", probe::decoration::rare);
throw move_to_removed_server();
// Get all existing shards overlapping keys (exclude any that have been processed in a previous
// iteration of the outer loop)
state KeyRange currentKeys = KeyRangeRef(begin, keys.end);
state RangeResult old = wait(krmGetRanges(tr,
// Determine the last processed key (which will be the beginning for the next iteration)
state Key endKey = old.end()[-1].key;
currentKeys = KeyRangeRef(currentKeys.begin, endKey);
// TraceEvent("StartMoveKeysBatch", relocationIntervalId)
// .detail("KeyBegin", currentKeys.begin.toString())
// .detail("KeyEnd", currentKeys.end.toString());
// printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(),
// keys.end.toString().c_str(), old.size(), servers.size()); for(int i=0; i<old.size(); i++)
// printf("'%s': '%s'\n", old[i].key.toString().c_str(), old[i].value.toString().c_str());
// Check that enough servers for each shard are in the correct state
state RangeResult UIDtoTagMap = wait(tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
std::vector<std::vector<UID>> addAsSource = wait(additionalSources(
old, tr, servers.size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER * servers.size()));
// For each intersecting range, update keyServers[range] dest to be servers and clear existing dest
// servers from serverKeys
for (int i = 0; i < old.size() - 1; ++i) {
KeyRangeRef rangeIntersectKeys(old[i].key, old[i + 1].key);
std::vector<UID> src;
std::vector<UID> dest;
decodeKeyServersValue(UIDtoTagMap, old[i].value, src, dest);
// TraceEvent("StartMoveKeysOldRange", relocationIntervalId)
// .detail("KeyBegin", rangeIntersectKeys.begin.toString())
// .detail("KeyEnd", rangeIntersectKeys.end.toString())
// .detail("OldSrc", describe(src))
// .detail("OldDest", describe(dest))
// .detail("ReadVersion", tr->getReadVersion().get());
for (auto& uid : addAsSource[i]) {
// Update dest servers for this range to be equal to servers
keyServersValue(UIDtoTagMap, src, servers),
old[i + 1].value);
// Track old destination servers. They may be removed from serverKeys soon, since they are
// about to be overwritten in keyServers
for (auto s = dest.begin(); s != dest.end(); ++s) {
// TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s);
// Keep track of src shards so that we can preserve their values when we overwrite serverKeys
for (auto& uid : src) {
shardMap[uid].push_back(old.arena(), rangeIntersectKeys);
// TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid);
state std::set<UID>::iterator oldDest;
// Remove old dests from serverKeys. In order for krmSetRangeCoalescing to work correctly in the
// same prefix for a single transaction, we must do most of the coalescing ourselves. Only the
// shards on the boundary of currentRange are actually coalesced with the ranges outside of
// currentRange. For all shards internal to currentRange, we overwrite all consecutive keys whose
// value is or should be serverKeysFalse in a single write
std::vector<Future<Void>> actors;
for (oldDest = oldDests.begin(); oldDest != oldDests.end(); ++oldDest)
if (std::find(servers.begin(), servers.end(), *oldDest) == servers.end())
actors.push_back(removeOldDestinations(tr, *oldDest, shardMap[*oldDest], currentKeys));
// Update serverKeys to include keys (or the currently processed subset of keys) for each SS in
// servers
for (int i = 0; i < servers.size(); i++) {
// Since we are setting this for the entire range, serverKeys and keyServers aren't guaranteed
// to have the same shard boundaries If that invariant was important, we would have to move this
// inside the loop above and also set it for the src servers
tr, serverKeysPrefixFor(servers[i]), currentKeys, allKeys, serverKeysTrue));
/*TraceEvent("StartMoveKeysCommitDone", relocationIntervalId)
.detail("CommitVersion", tr.getCommittedVersion())
.detail("ShardsInBatch", old.size() - 1);*/
begin = endKey;
shards += old.size() - 1;
} catch (Error& e) {
state Error err = e;
if (err.code() == error_code_move_to_removed_server)
if (retries % 10 == 0) {
retries == 50 ? SevWarnAlways : SevWarn, "StartMoveKeysRetrying", relocationIntervalId)
.detail("Keys", keys)
.detail("BeginKey", begin)
.detail("NumTries", retries);
if (retries > maxRetries) {
maxRetries = retries;
// printf("Committed moving '%s'-'%s' (version %lld)\n", keys.begin.toString().c_str(),