From d7a408928a9994379a57fd355bcca7487762079e Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Sat, 15 Oct 2022 23:05:52 -0700 Subject: [PATCH] Revert "extract startMoveKeysTransaction method" This reverts commit 014398a5a0d9b3442a5c626197f1d65ff6c177e3. --- fdbserver/MoveKeys.actor.cpp | 347 +++++++++++++++++------------------ 1 file changed, 164 insertions(+), 183 deletions(-) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index d7dfbd74f8..51611229f9 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -578,183 +578,12 @@ ACTOR Future logWarningAfter(const char* context, double duration, std::ve } } -struct MoveKeysBatchInfo { - Key batchEnd; - int batchShards = 0; -}; - // keyServer: map from keys to destination servers // serverKeys: two-dimension map: [servers][keys], value is the servers' state of having the keys: active(not-have), // complete(already has), ""(). Set keyServers[keys].dest = servers Set serverKeys[servers][keys] = active for each // subrange of keys that the server did not already have, complete for each subrange that it already has Set // serverKeys[dest][keys] = "" for the dest servers of each existing shard in keys (unless that destination is a member // of servers OR if the source list is sufficiently degraded) -ACTOR static Future startMoveKeysTransaction(Database occ, - KeyRange keys, - std::vector* servers, - MoveKeysLock lock, - UID relocationIntervalId, - std::map* tssMapping, - const DDEnabledState* ddEnabledState, - bool loadedTssMapping, - Key begin, - int shards) { - state int retries = 0; // RYW to optimize re-reading the same key ranges - state Reference tr = makeReference(occ); - loop { - try { - retries++; - - // Keep track of old dests that may need to have ranges removed from serverKeys - state std::__1::set oldDests; - - // Keep track of shards for all src servers so that we can preserve their values in serverKeys - state Map> shardMap; - - tr->getTransaction().trState->taskID = TaskPriority::MoveKeys; - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - - wait(checkMoveKeysLock(&(tr->getTransaction()), lock, ddEnabledState)); - - if (!loadedTssMapping) { - // share transaction for loading tss mapping with the rest of start move keys - wait(readTSSMappingRYW(tr, tssMapping)); - loadedTssMapping = true; - } - - std::__1::vector>> serverListEntries; - serverListEntries.reserve(servers->size()); - for (int s = 0; s < servers->size(); s++) - serverListEntries.push_back(tr->get(serverListKeyFor(servers->at(s)))); - state std::__1::vector> serverListValues = wait(getAll(serverListEntries)); - - for (int s = 0; s < serverListValues.size(); s++) { - if (!serverListValues[s].present()) { - // Attempt to move onto a server that isn't in serverList (removed or never added to the - // database) This can happen (why?) and is handled by the data distribution algorithm - // FIXME: Answer why this can happen? - CODE_PROBE(true, "start move keys moving to a removed server", probe::decoration::rare); - throw move_to_removed_server(); - } - } - - // Get all existing shards overlapping keys (exclude any that have been processed in a previous - // iteration of the outer loop) - state KeyRange currentKeys = KeyRangeRef(begin, keys.end); - - state RangeResult old = wait(krmGetRanges(tr, - keyServersPrefix, - currentKeys, - SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, - SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); - - // Determine the last processed key (which will be the beginning for the next iteration) - state Key endKey = old.end()[-1].key; - currentKeys = KeyRangeRef(currentKeys.begin, endKey); - - // TraceEvent("StartMoveKeysBatch", relocationIntervalId) - // .detail("KeyBegin", currentKeys.begin.toString()) - // .detail("KeyEnd", currentKeys.end.toString()); - - // printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(), - // keys.end.toString().c_str(), old.size(), servers.size()); for(int i=0; igetRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); - ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); - std::__1::vector> addAsSource = wait(additionalSources( - old, tr, servers->size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER * servers->size())); - - // For each intersecting range, update keyServers[range] dest to be servers and clear existing dest - // servers from serverKeys - for (int i = 0; i < old.size() - 1; ++i) { - KeyRangeRef rangeIntersectKeys(old[i].key, old[i + 1].key); - std::__1::vector src; - std::__1::vector dest; - decodeKeyServersValue(UIDtoTagMap, old[i].value, src, dest); - - // TraceEvent("StartMoveKeysOldRange", relocationIntervalId) - // .detail("KeyBegin", rangeIntersectKeys.begin.toString()) - // .detail("KeyEnd", rangeIntersectKeys.end.toString()) - // .detail("OldSrc", describe(src)) - // .detail("OldDest", describe(dest)) - // .detail("ReadVersion", tr->getReadVersion().get()); - - for (auto& uid : addAsSource[i]) { - src.push_back(uid); - } - uniquify(src); - - // Update dest servers for this range to be equal to servers - krmSetPreviouslyEmptyRange(&(tr->getTransaction()), - keyServersPrefix, - rangeIntersectKeys, - keyServersValue(UIDtoTagMap, src, *servers), - old[i + 1].value); - - // Track old destination servers. They may be removed from serverKeys soon, since they are - // about to be overwritten in keyServers - for (auto s = dest.begin(); s != dest.end(); ++s) { - oldDests.insert(*s); - // TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s); - } - - // Keep track of src shards so that we can preserve their values when we overwrite serverKeys - for (auto& uid : src) { - shardMap[uid].push_back(old.arena(), rangeIntersectKeys); - // TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid); - } - } - - state std::__1::set::iterator oldDest; - - // Remove old dests from serverKeys. In order for krmSetRangeCoalescing to work correctly in the - // same prefix for a single transaction, we must do most of the coalescing ourselves. Only the - // shards on the boundary of currentRange are actually coalesced with the ranges outside of - // currentRange. For all shards internal to currentRange, we overwrite all consecutive keys whose - // value is or should be serverKeysFalse in a single write - std::__1::vector> actors; - for (oldDest = oldDests.begin(); oldDest != oldDests.end(); ++oldDest) - if (std::__1::find(servers->begin(), servers->end(), *oldDest) == servers->end()) - actors.push_back(removeOldDestinations(tr, *oldDest, shardMap[*oldDest], currentKeys)); - - // Update serverKeys to include keys (or the currently processed subset of keys) for each SS in - // servers - for (int i = 0; i < servers->size(); i++) { - // Since we are setting this for the entire range, serverKeys and keyServers aren't guaranteed - // to have the same shard boundaries If that invariant was important, we would have to move this - // inside the loop above and also set it for the src servers - actors.push_back(krmSetRangeCoalescing( - tr, serverKeysPrefixFor(servers->at(i)), currentKeys, allKeys, serverKeysTrue)); - } - - wait(waitForAll(actors)); - - wait(tr->commit()); - - /*TraceEvent("StartMoveKeysCommitDone", relocationIntervalId) - .detail("CommitVersion", tr.getCommittedVersion()) - .detail("ShardsInBatch", old.size() - 1);*/ - return MoveKeysBatchInfo{ endKey, old.size() - 1 }; - } catch (Error& e) { - state Error err = e; - if (err.code() == error_code_move_to_removed_server) - throw; - wait(tr->onError(e)); - - if (retries % 10 == 0) { - TraceEvent(retries == 50 ? SevWarnAlways : SevWarn, "StartMoveKeysRetrying", relocationIntervalId) - .error(err) - .detail("Keys", keys) - .detail("BeginKey", begin) - .detail("NumTries", retries); - } - } - } -} - ACTOR static Future startMoveKeys(Database occ, KeyRange keys, std::vector servers, @@ -785,18 +614,170 @@ ACTOR static Future startMoveKeys(Database occ, while (begin < keys.end) { CODE_PROBE(begin > keys.begin, "Multi-transactional startMoveKeys"); batches++; - MoveKeysBatchInfo batchInfo = wait(startMoveKeysTransaction(occ, - keys, - &servers, - lock, - relocationIntervalId, - tssMapping, - ddEnabledState, - loadedTssMapping, - begin, - shards)); - shards += batchInfo.batchShards; - begin = batchInfo.batchEnd; + + // RYW to optimize re-reading the same key ranges + state Reference tr = makeReference(occ); + state int retries = 0; + + loop { + try { + retries++; + + // Keep track of old dests that may need to have ranges removed from serverKeys + state std::set oldDests; + + // Keep track of shards for all src servers so that we can preserve their values in serverKeys + state Map> shardMap; + + tr->getTransaction().trState->taskID = TaskPriority::MoveKeys; + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + wait(checkMoveKeysLock(&(tr->getTransaction()), lock, ddEnabledState)); + + if (!loadedTssMapping) { + // share transaction for loading tss mapping with the rest of start move keys + wait(readTSSMappingRYW(tr, tssMapping)); + loadedTssMapping = true; + } + + std::vector>> serverListEntries; + serverListEntries.reserve(servers.size()); + for (int s = 0; s < servers.size(); s++) + serverListEntries.push_back(tr->get(serverListKeyFor(servers[s]))); + state std::vector> serverListValues = wait(getAll(serverListEntries)); + + for (int s = 0; s < serverListValues.size(); s++) { + if (!serverListValues[s].present()) { + // Attempt to move onto a server that isn't in serverList (removed or never added to the + // database) This can happen (why?) and is handled by the data distribution algorithm + // FIXME: Answer why this can happen? + CODE_PROBE(true, "start move keys moving to a removed server", probe::decoration::rare); + throw move_to_removed_server(); + } + } + + // Get all existing shards overlapping keys (exclude any that have been processed in a previous + // iteration of the outer loop) + state KeyRange currentKeys = KeyRangeRef(begin, keys.end); + + state RangeResult old = wait(krmGetRanges(tr, + keyServersPrefix, + currentKeys, + SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, + SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); + + // Determine the last processed key (which will be the beginning for the next iteration) + state Key endKey = old.end()[-1].key; + currentKeys = KeyRangeRef(currentKeys.begin, endKey); + + // TraceEvent("StartMoveKeysBatch", relocationIntervalId) + // .detail("KeyBegin", currentKeys.begin.toString()) + // .detail("KeyEnd", currentKeys.end.toString()); + + // printf("Moving '%s'-'%s' (%d) to %d servers\n", keys.begin.toString().c_str(), + // keys.end.toString().c_str(), old.size(), servers.size()); for(int i=0; igetRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); + std::vector> addAsSource = wait(additionalSources( + old, tr, servers.size(), SERVER_KNOBS->MAX_ADDED_SOURCES_MULTIPLIER * servers.size())); + + // For each intersecting range, update keyServers[range] dest to be servers and clear existing dest + // servers from serverKeys + for (int i = 0; i < old.size() - 1; ++i) { + KeyRangeRef rangeIntersectKeys(old[i].key, old[i + 1].key); + std::vector src; + std::vector dest; + decodeKeyServersValue(UIDtoTagMap, old[i].value, src, dest); + + // TraceEvent("StartMoveKeysOldRange", relocationIntervalId) + // .detail("KeyBegin", rangeIntersectKeys.begin.toString()) + // .detail("KeyEnd", rangeIntersectKeys.end.toString()) + // .detail("OldSrc", describe(src)) + // .detail("OldDest", describe(dest)) + // .detail("ReadVersion", tr->getReadVersion().get()); + + for (auto& uid : addAsSource[i]) { + src.push_back(uid); + } + uniquify(src); + + // Update dest servers for this range to be equal to servers + krmSetPreviouslyEmptyRange(&(tr->getTransaction()), + keyServersPrefix, + rangeIntersectKeys, + keyServersValue(UIDtoTagMap, src, servers), + old[i + 1].value); + + // Track old destination servers. They may be removed from serverKeys soon, since they are + // about to be overwritten in keyServers + for (auto s = dest.begin(); s != dest.end(); ++s) { + oldDests.insert(*s); + // TraceEvent("StartMoveKeysOldDestAdd", relocationIntervalId).detail("Server", *s); + } + + // Keep track of src shards so that we can preserve their values when we overwrite serverKeys + for (auto& uid : src) { + shardMap[uid].push_back(old.arena(), rangeIntersectKeys); + // TraceEvent("StartMoveKeysShardMapAdd", relocationIntervalId).detail("Server", uid); + } + } + + state std::set::iterator oldDest; + + // Remove old dests from serverKeys. In order for krmSetRangeCoalescing to work correctly in the + // same prefix for a single transaction, we must do most of the coalescing ourselves. Only the + // shards on the boundary of currentRange are actually coalesced with the ranges outside of + // currentRange. For all shards internal to currentRange, we overwrite all consecutive keys whose + // value is or should be serverKeysFalse in a single write + std::vector> actors; + for (oldDest = oldDests.begin(); oldDest != oldDests.end(); ++oldDest) + if (std::find(servers.begin(), servers.end(), *oldDest) == servers.end()) + actors.push_back(removeOldDestinations(tr, *oldDest, shardMap[*oldDest], currentKeys)); + + // Update serverKeys to include keys (or the currently processed subset of keys) for each SS in + // servers + for (int i = 0; i < servers.size(); i++) { + // Since we are setting this for the entire range, serverKeys and keyServers aren't guaranteed + // to have the same shard boundaries If that invariant was important, we would have to move this + // inside the loop above and also set it for the src servers + actors.push_back(krmSetRangeCoalescing( + tr, serverKeysPrefixFor(servers[i]), currentKeys, allKeys, serverKeysTrue)); + } + + wait(waitForAll(actors)); + + wait(tr->commit()); + + /*TraceEvent("StartMoveKeysCommitDone", relocationIntervalId) + .detail("CommitVersion", tr.getCommittedVersion()) + .detail("ShardsInBatch", old.size() - 1);*/ + begin = endKey; + shards += old.size() - 1; + break; + } catch (Error& e) { + state Error err = e; + if (err.code() == error_code_move_to_removed_server) + throw; + wait(tr->onError(e)); + + if (retries % 10 == 0) { + TraceEvent( + retries == 50 ? SevWarnAlways : SevWarn, "StartMoveKeysRetrying", relocationIntervalId) + .error(err) + .detail("Keys", keys) + .detail("BeginKey", begin) + .detail("NumTries", retries); + } + } + } + + if (retries > maxRetries) { + maxRetries = retries; + } } // printf("Committed moving '%s'-'%s' (version %lld)\n", keys.begin.toString().c_str(),