From 6612cc00b62c1d5aa327ebbd38c7eeb8c95ffd66 Mon Sep 17 00:00:00 2001 From: helium Date: Wed, 1 Sep 2021 14:52:07 -0700 Subject: [PATCH 01/19] Check if the src server list will be empty before removing a failed server." --- fdbserver/DataDistribution.actor.cpp | 2 +- fdbserver/MoveKeys.actor.cpp | 82 +++++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 28b6911ca2..691b04dcde 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3581,7 +3581,7 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea bool healthy = !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize; team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam bool optimal = team->isOptimal() && healthy; - bool containsFailed = teamContainsFailedServer(self, team); + bool containsFailed = teamContainsFailedServer(self, team); // True if the team has already excluded by TC bool recheck = !healthy && (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams->get()) || containsFailed); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 164826f449..69b3e2e41e 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -236,6 +236,44 @@ ACTOR Future> addReadWriteDestinations(KeyRangeRef shard, return result; } +ACTOR Future> pickReadWriteServers(std::vector candidates, KeyRangeRef range, Transaction* tr) { + vector>> serverListEntries; + + for (const UID id : candidates) { + serverListEntries.push_back(tr->get(serverListKeyFor(id))); + } + + vector> serverListValues = wait(getAll(serverListEntries)); + + std::vector ssis; + for (auto& v : serverListValues) { + ssis.push_back(decodeServerListValue(v.get())); + } + + state std::vector>> checks; + checks.reserve(ssis.size()); + for (auto& ssi : ssis) { + checks.push_back(checkReadWrite( + ssi.getShardState.getReplyUnlessFailedFor(GetShardStateRequest(range, GetShardStateRequest::NO_WAIT), + SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, + 0, + TaskPriority::MoveKeys), + ssi.id(), + 0)); + } + + wait(waitForAll(checks)); + + vector result; + for (const auto& it : checks) { + if (it.get().present()) { + result.push_back(it.get().get()); + } + } + + return result; +} + ACTOR Future>> additionalSources(RangeResult shards, Reference tr, int desiredHealthy, @@ -1290,10 +1328,16 @@ ACTOR Future removeKeysFromFailedServer(Database cx, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); - for (int i = 0; i < keyServers.size() - 1; ++i) { - auto it = keyServers[i]; - vector src; - vector dest; + state std::vector serversToRemoveRange; + state vector src; + state vector dest; + state int i = 0; + for (i = 0; i < keyServers.size() - 1; ++i) { + src.clear(); + dest.clear(); + serversToRemoveRange.clear(); + serversToRemoveRange.push_back(serverID); + state KeyValueRef it = keyServers[i]; decodeKeyServersValue(UIDtoTagMap, it.value, src, dest); // The failed server is not present @@ -1306,11 +1350,31 @@ ACTOR Future removeKeysFromFailedServer(Database cx, // Dest is usually empty, but keep this in case there is parallel data movement src.erase(std::remove(src.begin(), src.end(), serverID), src.end()); dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end()); - TraceEvent(SevDebug, "FailedServerSetKey", serverID) - .detail("Key", it.key) - .detail("ValueSrc", describe(src)) - .detail("ValueDest", describe(dest)); - tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest)); + if (src.empty() && !dest.empty()) { + std::vector newSources = + wait(pickReadWriteServers(dest, KeyRangeRef(it.key, keyServers[i + 1].key), &tr)); + for (const UID& id : newSources) { + TraceEvent(SevWarn, "FailedServerAdditionalSourceServer", serverID) + .detail("Key", it.key) + .detail("NewSourceServerFromDest", id); + dest.erase(std::remove(dest.begin(), dest.end(), id), dest.end()); + src.push_back(id); + } + } + if (src.empty()) { + TraceEvent(SevWarn, "FailedServerRemoveRange", serverID) + .detail("Key", it.key) + .detail("ValueDest", describe(dest)); + tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest)); + serversToRemoveRange.insert(serversToRemoveRange.end(), dest.begin(), dest.end()); + tr.clear(keyServersKey(it.key)); + } else { + TraceEvent(SevDebug, "FailedServerSetKey", serverID) + .detail("Key", it.key) + .detail("ValueSrc", describe(src)) + .detail("ValueDest", describe(dest)); + tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest)); + } } // Set entire range for our serverID in serverKeys keyspace to false to signal erasure From a8a6f6b8a36e3765fc76b67345cc289ee37d9593 Mon Sep 17 00:00:00 2001 From: helium Date: Wed, 1 Sep 2021 15:35:40 -0700 Subject: [PATCH 02/19] Remove Transaction::set() when clearing the keyrange. --- fdbserver/MoveKeys.actor.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 69b3e2e41e..7fbc8178bb 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1365,7 +1365,6 @@ ACTOR Future removeKeysFromFailedServer(Database cx, TraceEvent(SevWarn, "FailedServerRemoveRange", serverID) .detail("Key", it.key) .detail("ValueDest", describe(dest)); - tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest)); serversToRemoveRange.insert(serversToRemoveRange.end(), dest.begin(), dest.end()); tr.clear(keyServersKey(it.key)); } else { From 7e7f9372dee6c5813eee2a5fcf3c06ed6f4c5e62 Mon Sep 17 00:00:00 2001 From: helium Date: Thu, 2 Sep 2021 21:58:05 -0700 Subject: [PATCH 03/19] Replaced clear() with krmSetRangeCoalescing(). --- fdbserver/MoveKeys.actor.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 7fbc8178bb..b1b5dc6431 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1346,6 +1346,11 @@ ACTOR Future removeKeysFromFailedServer(Database cx, continue; } + TraceEvent("FailedServerRemoveBegin", serverID) + .detail("Key", it.key) + .detail("ValueSrc", describe(src)) + .detail("ValueDest", describe(dest)); + // Update the vectors to remove failed server then set the value again // Dest is usually empty, but keep this in case there is parallel data movement src.erase(std::remove(src.begin(), src.end(), serverID), src.end()); @@ -1366,7 +1371,11 @@ ACTOR Future removeKeysFromFailedServer(Database cx, .detail("Key", it.key) .detail("ValueDest", describe(dest)); serversToRemoveRange.insert(serversToRemoveRange.end(), dest.begin(), dest.end()); - tr.clear(keyServersKey(it.key)); + wait(krmSetRangeCoalescing(&tr, + keyServersPrefix, + KeyRangeRef(it.key, keyServers[i + 1].key), + allKeys, + keyServers[i + 1].value)); } else { TraceEvent(SevDebug, "FailedServerSetKey", serverID) .detail("Key", it.key) From 8e0b572a188f999d9c65265863150c57a20064e6 Mon Sep 17 00:00:00 2001 From: helium Date: Thu, 2 Sep 2021 22:29:07 -0700 Subject: [PATCH 04/19] Added comments. --- fdbserver/DataDistribution.actor.cpp | 2 +- fdbserver/MoveKeys.actor.cpp | 21 +++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 691b04dcde..28b6911ca2 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3581,7 +3581,7 @@ ACTOR Future teamTracker(DDTeamCollection* self, Reference tea bool healthy = !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize; team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam bool optimal = team->isOptimal() && healthy; - bool containsFailed = teamContainsFailedServer(self, team); // True if the team has already excluded by TC + bool containsFailed = teamContainsFailedServer(self, team); bool recheck = !healthy && (lastReady != self->initialFailureReactionDelay.isReady() || (lastZeroHealthy && !self->zeroHealthyTeams->get()) || containsFailed); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index b1b5dc6431..1eb27f6738 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -236,7 +236,8 @@ ACTOR Future> addReadWriteDestinations(KeyRangeRef shard, return result; } -ACTOR Future> pickReadWriteServers(std::vector candidates, KeyRangeRef range, Transaction* tr) { +// Returns storage servers selected from 'candidates', who is serving a read-write copy of 'range'. +ACTOR Future> pickReadWriteServers(Transaction* tr, std::vector candidates, KeyRangeRef range) { vector>> serverListEntries; for (const UID id : candidates) { @@ -1328,15 +1329,11 @@ ACTOR Future removeKeysFromFailedServer(Database cx, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); - state std::vector serversToRemoveRange; - state vector src; - state vector dest; state int i = 0; - for (i = 0; i < keyServers.size() - 1; ++i) { - src.clear(); - dest.clear(); - serversToRemoveRange.clear(); - serversToRemoveRange.push_back(serverID); + for (; i < keyServers.size() - 1; ++i) { + state vector src; + state vector dest; + state std::vector serversToRemoveRange({ serverID }); state KeyValueRef it = keyServers[i]; decodeKeyServersValue(UIDtoTagMap, it.value, src, dest); @@ -1355,9 +1352,11 @@ ACTOR Future removeKeysFromFailedServer(Database cx, // Dest is usually empty, but keep this in case there is parallel data movement src.erase(std::remove(src.begin(), src.end(), serverID), src.end()); dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end()); + // If the last src server is to be removed, first check if there are dest servers who is + // hosting a read-write copy of the data, and move such dest servers to the src list. if (src.empty() && !dest.empty()) { std::vector newSources = - wait(pickReadWriteServers(dest, KeyRangeRef(it.key, keyServers[i + 1].key), &tr)); + wait(pickReadWriteServers(&tr, dest, KeyRangeRef(it.key, keyServers[i + 1].key))); for (const UID& id : newSources) { TraceEvent(SevWarn, "FailedServerAdditionalSourceServer", serverID) .detail("Key", it.key) @@ -1366,6 +1365,8 @@ ACTOR Future removeKeysFromFailedServer(Database cx, src.push_back(id); } } + // Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all + // dest servers. if (src.empty()) { TraceEvent(SevWarn, "FailedServerRemoveRange", serverID) .detail("Key", it.key) From 2ea055371be64df42d4c956126a440fc9b008c22 Mon Sep 17 00:00:00 2001 From: helium Date: Thu, 9 Sep 2021 14:15:07 -0700 Subject: [PATCH 05/19] Introduced serverKeysTrueEmptyRange to indicate the SS that the new shard is empty. --- fdbclient/SystemData.cpp | 4 +-- fdbclient/SystemData.h | 2 +- fdbserver/ApplyMetadataMutation.cpp | 6 +++- fdbserver/CommitProxyServer.actor.cpp | 1 + fdbserver/MoveKeys.actor.cpp | 50 +++++++++++++++++++++------ fdbserver/storageserver.actor.cpp | 5 ++- 6 files changed, 53 insertions(+), 15 deletions(-) diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index c3b7e85343..b9e10af298 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -275,7 +275,7 @@ std::pair>, vector& serve // as the key, the value indicates whether the shard does or does not exist on the server. // These values can be changed as data movement occurs. extern const KeyRef serverKeysPrefix; -extern const ValueRef serverKeysTrue, serverKeysFalse; +extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse; const Key serverKeysKey(UID serverID, const KeyRef& keys); const Key serverKeysPrefixFor(UID serverID); UID serverKeysDecodeServer(const KeyRef& key); diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index d79193ca37..c039f7efa0 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -35,7 +35,11 @@ Reference getStorageInfo(UID id, auto cacheItr = storageCache->find(id); if (cacheItr == storageCache->end()) { storageInfo = makeReference(); - storageInfo->tag = decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(id)).get().get()); + Optional tag = txnStateStore->readValue(serverTagKeyFor(id)).get(); + TraceEvent(SevWarn, "HeLiuDebuggetStorageInfo") + .detail("SSID", id) + .detail("Tag", tag.present() ? tag.get().toString() : ""); + storageInfo->tag = decodeServerTagValue(tag.get()); storageInfo->interf = decodeServerListValue(txnStateStore->readValue(serverListKeyFor(id)).get().get()); (*storageCache)[id] = storageInfo; } else { diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 413a4456e1..49e04b9f64 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -1916,6 +1916,7 @@ ACTOR Future processCompleteTransactionStateRequest(TransactionStateResolv updateTagInfo(src, info.tags, info.src_info); info.dest_info.clear(); + TraceEvent(SevWarn, "HeLiuDebugUpdateDest").detail("Dest", describe(dest)); updateTagInfo(dest, info.tags, info.dest_info); uniquify(info.tags); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 1eb27f6738..4472db9e45 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1259,6 +1259,7 @@ ACTOR Future removeStorageServer(Database cx, allLocalities.insert(dcId_locality[decodeTLogDatacentersKey(it.key)]); } + // If the storage server is in an invalid DC, remove the DC? if (locality >= 0 && !allLocalities.count(locality)) { for (auto& it : fTagLocalities.get()) { if (locality == decodeTagLocalityListValue(it.value)) { @@ -1305,7 +1306,11 @@ ACTOR Future removeKeysFromFailedServer(Database cx, UID serverID, MoveKeysLock lock, const DDEnabledState* ddEnabledState) { + state std::vector targetTeam; state Key begin = allKeys.begin; + + state vector src; + state vector dest; // Multi-transactional removal in case of large number of shards, concern in violating 5s transaction limit while (begin < allKeys.end) { state Transaction tr(cx); @@ -1328,12 +1333,23 @@ ACTOR Future removeKeysFromFailedServer(Database cx, KeyRangeRef(begin, allKeys.end), SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); + + for (int i = 0; i < keyServers.size() && targetTeam.empty(); ++i) { + decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); + if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) { + targetTeam.insert(targetTeam.end(), dest.begin(), dest.end()); + } + if (!targetTeam.empty()) { + break; + } + if (std::find(src.begin(), src.end(), serverID) == src.end()) { + targetTeam.insert(targetTeam.end(), src.begin(), src.end()); + } + } + state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); state int i = 0; for (; i < keyServers.size() - 1; ++i) { - state vector src; - state vector dest; - state std::vector serversToRemoveRange({ serverID }); state KeyValueRef it = keyServers[i]; decodeKeyServersValue(UIDtoTagMap, it.value, src, dest); @@ -1368,15 +1384,29 @@ ACTOR Future removeKeysFromFailedServer(Database cx, // Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all // dest servers. if (src.empty()) { + ASSERT(!targetTeam.empty()); + tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, targetTeam, {})); + vector> actors; + for (const UID& id : dest) { + actors.push_back(krmSetRangeCoalescing(&tr, + serverKeysPrefixFor(id), + KeyRangeRef(it.key, keyServers[i + 1].key), + allKeys, + serverKeysFalse)); + } + // Update serverKeys to include keys. + for (const UID& id : targetTeam) { + actors.push_back(krmSetRangeCoalescing(&tr, + serverKeysPrefixFor(id), + KeyRangeRef(it.key, keyServers[i + 1].key), + allKeys, + serverKeysTrueEmptyRange)); + } TraceEvent(SevWarn, "FailedServerRemoveRange", serverID) .detail("Key", it.key) - .detail("ValueDest", describe(dest)); - serversToRemoveRange.insert(serversToRemoveRange.end(), dest.begin(), dest.end()); - wait(krmSetRangeCoalescing(&tr, - keyServersPrefix, - KeyRangeRef(it.key, keyServers[i + 1].key), - allKeys, - keyServers[i + 1].value)); + .detail("OldDest", describe(dest)) + .detail("NewTeam", describe(targetTeam)); + waitForAll(actors); } else { TraceEvent(SevDebug, "FailedServerSetKey", serverID) .detail("Key", it.key) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 9a300d4295..d81d3e7de2 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3470,6 +3470,7 @@ private: KeyRef startKey; bool nowAssigned; + bool emptyRange; bool processedStartKey; KeyRef cacheStartKey; @@ -3492,7 +3493,8 @@ private: // The changes for version have already been received (and are being processed now). We need to fetch // the data for change.version-1 (changes from versions < change.version) - changeServerKeys(data, keys, nowAssigned, currentVersion - 1, CSK_UPDATE); + const Version shardVersion = (emptyRange && nowAssigned) ? 1 : currentVersion - 1; + changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE); } processedStartKey = false; @@ -3502,6 +3504,7 @@ private: // keys startKey = m.param1; nowAssigned = m.param2 != serverKeysFalse; + emptyRange = m.param2 == serverKeysTrueEmptyRange; processedStartKey = true; } else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) { // lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version) From 65e6f26f053b2daf62b36dbbb96c74aeeaa0151a Mon Sep 17 00:00:00 2001 From: helium Date: Thu, 9 Sep 2021 20:43:29 -0700 Subject: [PATCH 06/19] bug fix on unwaited actor --- .../sphinx/source/command-line-interface.rst | 5 +++++ fdbcli/fdbcli.actor.cpp | 15 +++++++++++++++ fdbserver/ApplyMetadataMutation.cpp | 2 +- fdbserver/MoveKeys.actor.cpp | 12 ++++++++---- fdbserver/storageserver.actor.cpp | 5 +++-- 5 files changed, 32 insertions(+), 7 deletions(-) diff --git a/documentation/sphinx/source/command-line-interface.rst b/documentation/sphinx/source/command-line-interface.rst index c7d717d799..b509dfe757 100644 --- a/documentation/sphinx/source/command-line-interface.rst +++ b/documentation/sphinx/source/command-line-interface.rst @@ -189,6 +189,11 @@ The ``getrangekeys`` command fetches keys in a range. Its syntax is ``getrangeke Note that :ref:`characters can be escaped ` when specifying keys (or values) in ``fdbcli``. +printkeymetadata +------------ + +The ``printkeymetadata`` command. + getversion ---------- diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 42501fd948..670684516b 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -601,6 +601,10 @@ void initHelp() { "fetch keys in a range of keys", "Displays up to LIMIT keys for keys between BEGINKEY (inclusive) and ENDKEY (exclusive). If ENDKEY is omitted, " "then the range will include all keys starting with BEGINKEY. LIMIT defaults to 25 if omitted." ESCAPINGK); + helpMap["printkeymetadata"] = CommandHelp( + "getrangekeys ", + "fetch location metadata for KEY", + "Displays location metadata for KEY" ESCAPINGK); helpMap["getversion"] = CommandHelp("getversion", "Fetch the current read version", @@ -3721,7 +3725,18 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { } continue; } + if (tokencmp(tokens[0], "printkeymetadata")) { // FIXME: support byte limits, and reverse range reads + RangeResult kvs = wait(makeInterruptable( + safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans) + ->getRange(keyServersKeys, 2000)))); + printf("\nRange limited to %d keys\n", limit); + for (auto iter = kvs.begin(); iter < kvs.end(); iter++) { + printf("`%s'\n", printable((*iter).key).c_str()); + } + printf("\n"); + continue; + } if (tokencmp(tokens[0], "writemode")) { if (tokens.size() != 2) { printUsage(tokens[0]); diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index c039f7efa0..76fa1a0aae 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -197,7 +197,7 @@ private: txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get()); MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - TraceEvent(SevDebug, "SendingPrivateMutation", dbgid) + TraceEvent("SendingPrivateMutation", dbgid) .detail("Original", m) .detail("Privatized", privatized) .detail("Server", serverKeysDecodeServer(m.param1)) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 4472db9e45..6e78526333 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1406,9 +1406,13 @@ ACTOR Future removeKeysFromFailedServer(Database cx, .detail("Key", it.key) .detail("OldDest", describe(dest)) .detail("NewTeam", describe(targetTeam)); - waitForAll(actors); + wait(waitForAll(actors)); + TraceEvent(SevWarn, "FailedServerRemoveRangeEnd", serverID) + .detail("Key", it.key) + .detail("OldDest", describe(dest)) + .detail("NewTeam", describe(targetTeam)); } else { - TraceEvent(SevDebug, "FailedServerSetKey", serverID) + TraceEvent("FailedServerSetKey", serverID) .detail("Key", it.key) .detail("ValueSrc", describe(src)) .detail("ValueDest", describe(dest)); @@ -1417,12 +1421,12 @@ ACTOR Future removeKeysFromFailedServer(Database cx, } // Set entire range for our serverID in serverKeys keyspace to false to signal erasure - TraceEvent(SevDebug, "FailedServerSetRange", serverID) + TraceEvent("FailedServerSetRange", serverID) .detail("Begin", currentKeys.begin) .detail("End", currentKeys.end); wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), currentKeys, allKeys, serverKeysFalse)); wait(tr.commit()); - TraceEvent(SevDebug, "FailedServerCommitSuccess", serverID) + TraceEvent("FailedServerCommitSuccess", serverID) .detail("Begin", currentKeys.begin) .detail("End", currentKeys.end) .detail("CommitVersion", tr.getCommittedVersion()); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index d81d3e7de2..73328b8b1b 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3320,6 +3320,7 @@ void changeServerKeys(StorageServer* data, } else if (!dataAvailable) { // SOMEDAY: Avoid restarting adding/transferred shards if (version == 0) { // bypass fetchkeys; shard is known empty at version 0 + TraceEvent("ChangeServerKeysEmptyRange", data->thisServerID); changeNewestAvailable.emplace_back(range, latestVersion); data->addShard(ShardInfo::newReadWrite(range, data)); setAvailableStatus(data, range, true); @@ -3477,7 +3478,7 @@ private: bool processedCacheStartKey; void applyPrivateData(StorageServer* data, MutationRef const& m) { - TraceEvent(SevDebug, "SSPrivateMutation", data->thisServerID).detail("Mutation", m); + TraceEvent("SSPrivateMutation", data->thisServerID).detail("Mutation", m); if (processedStartKey) { // Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end) @@ -3493,7 +3494,7 @@ private: // The changes for version have already been received (and are being processed now). We need to fetch // the data for change.version-1 (changes from versions < change.version) - const Version shardVersion = (emptyRange && nowAssigned) ? 1 : currentVersion - 1; + const Version shardVersion = (emptyRange && nowAssigned) ? 0 : currentVersion - 1; changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE); } From cf1626f5533e45c4389f0b2b24972437402b41ef Mon Sep 17 00:00:00 2001 From: helium Date: Fri, 10 Sep 2021 11:10:10 -0700 Subject: [PATCH 07/19] Added comments. --- fdbclient/SystemData.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index b9e10af298..dedcc26588 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -275,7 +275,8 @@ std::pair>, vector Date: Fri, 10 Sep 2021 11:34:34 -0700 Subject: [PATCH 08/19] revert test traceEvents --- .../sphinx/source/command-line-interface.rst | 5 ----- fdbcli/fdbcli.actor.cpp | 15 --------------- fdbserver/ApplyMetadataMutation.cpp | 8 ++------ fdbserver/CommitProxyServer.actor.cpp | 1 - 4 files changed, 2 insertions(+), 27 deletions(-) diff --git a/documentation/sphinx/source/command-line-interface.rst b/documentation/sphinx/source/command-line-interface.rst index b509dfe757..c7d717d799 100644 --- a/documentation/sphinx/source/command-line-interface.rst +++ b/documentation/sphinx/source/command-line-interface.rst @@ -189,11 +189,6 @@ The ``getrangekeys`` command fetches keys in a range. Its syntax is ``getrangeke Note that :ref:`characters can be escaped ` when specifying keys (or values) in ``fdbcli``. -printkeymetadata ------------- - -The ``printkeymetadata`` command. - getversion ---------- diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 670684516b..42501fd948 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -601,10 +601,6 @@ void initHelp() { "fetch keys in a range of keys", "Displays up to LIMIT keys for keys between BEGINKEY (inclusive) and ENDKEY (exclusive). If ENDKEY is omitted, " "then the range will include all keys starting with BEGINKEY. LIMIT defaults to 25 if omitted." ESCAPINGK); - helpMap["printkeymetadata"] = CommandHelp( - "getrangekeys ", - "fetch location metadata for KEY", - "Displays location metadata for KEY" ESCAPINGK); helpMap["getversion"] = CommandHelp("getversion", "Fetch the current read version", @@ -3725,18 +3721,7 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { } continue; } - if (tokencmp(tokens[0], "printkeymetadata")) { // FIXME: support byte limits, and reverse range reads - RangeResult kvs = wait(makeInterruptable( - safeThreadFutureToFuture(getTransaction(db, tr, tr2, options, intrans) - ->getRange(keyServersKeys, 2000)))); - printf("\nRange limited to %d keys\n", limit); - for (auto iter = kvs.begin(); iter < kvs.end(); iter++) { - printf("`%s'\n", printable((*iter).key).c_str()); - } - printf("\n"); - continue; - } if (tokencmp(tokens[0], "writemode")) { if (tokens.size() != 2) { printUsage(tokens[0]); diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 76fa1a0aae..d79193ca37 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -35,11 +35,7 @@ Reference getStorageInfo(UID id, auto cacheItr = storageCache->find(id); if (cacheItr == storageCache->end()) { storageInfo = makeReference(); - Optional tag = txnStateStore->readValue(serverTagKeyFor(id)).get(); - TraceEvent(SevWarn, "HeLiuDebuggetStorageInfo") - .detail("SSID", id) - .detail("Tag", tag.present() ? tag.get().toString() : ""); - storageInfo->tag = decodeServerTagValue(tag.get()); + storageInfo->tag = decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(id)).get().get()); storageInfo->interf = decodeServerListValue(txnStateStore->readValue(serverListKeyFor(id)).get().get()); (*storageCache)[id] = storageInfo; } else { @@ -197,7 +193,7 @@ private: txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get()); MutationRef privatized = m; privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - TraceEvent("SendingPrivateMutation", dbgid) + TraceEvent(SevDebug, "SendingPrivateMutation", dbgid) .detail("Original", m) .detail("Privatized", privatized) .detail("Server", serverKeysDecodeServer(m.param1)) diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 49e04b9f64..413a4456e1 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -1916,7 +1916,6 @@ ACTOR Future processCompleteTransactionStateRequest(TransactionStateResolv updateTagInfo(src, info.tags, info.src_info); info.dest_info.clear(); - TraceEvent(SevWarn, "HeLiuDebugUpdateDest").detail("Dest", describe(dest)); updateTagInfo(dest, info.tags, info.dest_info); uniquify(info.tags); From 7e53f8662d8e6d1889c5c191feca56ee10cab56e Mon Sep 17 00:00:00 2001 From: helium Date: Mon, 13 Sep 2021 13:28:55 -0700 Subject: [PATCH 09/19] added comments --- fdbserver/DataDistribution.actor.cpp | 3 +++ fdbserver/MoveKeys.actor.cpp | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 28b6911ca2..b23af540f4 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -893,6 +893,9 @@ struct DDTeamCollection : ReferenceCounted { return Void(); } + Optional getRandomHealthyTeam() { + + } // SOMEDAY: Make bestTeam better about deciding to leave a shard where it is (e.g. in PRIORITY_TEAM_HEALTHY case) // use keys, src, dest, metrics, priority, system load, etc.. to decide... ACTOR static Future getTeam(DDTeamCollection* self, GetTeamRequest req) { diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 6e78526333..34a78a5441 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1259,7 +1259,6 @@ ACTOR Future removeStorageServer(Database cx, allLocalities.insert(dcId_locality[decodeTLogDatacentersKey(it.key)]); } - // If the storage server is in an invalid DC, remove the DC? if (locality >= 0 && !allLocalities.count(locality)) { for (auto& it : fTagLocalities.get()) { if (locality == decodeTagLocalityListValue(it.value)) { From fd6d0889459a801c0be8d5a728689c17739d66ff Mon Sep 17 00:00:00 2001 From: helium Date: Tue, 14 Sep 2021 19:24:59 -0700 Subject: [PATCH 10/19] choose team before removing server --- fdbserver/DataDistribution.actor.cpp | 27 ++++++++++++++++-- fdbserver/MoveKeys.actor.cpp | 41 ++++++++++++++-------------- fdbserver/MoveKeys.actor.h | 1 + 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index b23af540f4..5a7e0e9471 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -893,9 +893,22 @@ struct DDTeamCollection : ReferenceCounted { return Void(); } - Optional getRandomHealthyTeam() { - + std::vector getRandomHealthyTeam() { + int count = 0; + Optional idx; + for (int i = 0; i < teams.size(); ++i) { + if (teams[i]->isHealthy()) { + if (std::rand() % ++count == 0) { + idx = i; + } + } + } + if (idx.present()) { + return teams[idx.get()]->getServerIDs(); + } + return std::vector(); } + // SOMEDAY: Make bestTeam better about deciding to leave a shard where it is (e.g. in PRIORITY_TEAM_HEALTHY case) // use keys, src, dest, metrics, priority, system load, etc.. to decide... ACTOR static Future getTeam(DDTeamCollection* self, GetTeamRequest req) { @@ -6125,6 +6138,13 @@ ACTOR Future dataDistribution(Reference self, trackerCancelled = true; state Error err = e; TraceEvent("DataDistributorDestroyTeamCollections").error(e); + state std::vector teamForDroppedRange; + std::vector pTeam = primaryTeamCollection->getRandomHealthyTeam(); + teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end()); + if (configuration.usableRegions > 1) { + std::vector rTeam = remoteTeamCollection->getRandomHealthyTeam(); + teamForDroppedRange.insert(teamForDroppedRange.end(), rTeam.begin(), rTeam.end()); + } self->teamCollection = nullptr; primaryTeamCollection = Reference(); remoteTeamCollection = Reference(); @@ -6132,7 +6152,8 @@ ACTOR Future dataDistribution(Reference self, TraceEvent("DataDistributorTeamCollectionsDestroyed").error(err); if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) { TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err); - wait(removeKeysFromFailedServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState)); + wait(removeKeysFromFailedServer( + cx, removeFailedServer.getFuture().get(), teamForDroppedRange, lock, ddEnabledState)); Optional tssPairID; wait(removeStorageServer(cx, removeFailedServer.getFuture().get(), tssPairID, lock, ddEnabledState)); } else { diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 34a78a5441..7550cf8470 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1303,9 +1303,10 @@ ACTOR Future removeStorageServer(Database cx, // Changes to keyServer and serverKey must happen symmetrically in a transaction. ACTOR Future removeKeysFromFailedServer(Database cx, UID serverID, + std::vector teamForDroppedRange, MoveKeysLock lock, const DDEnabledState* ddEnabledState) { - state std::vector targetTeam; + // state std::vector teamForDroppedRange; state Key begin = allKeys.begin; state vector src; @@ -1333,18 +1334,18 @@ ACTOR Future removeKeysFromFailedServer(Database cx, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); - for (int i = 0; i < keyServers.size() && targetTeam.empty(); ++i) { - decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); - if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) { - targetTeam.insert(targetTeam.end(), dest.begin(), dest.end()); - } - if (!targetTeam.empty()) { - break; - } - if (std::find(src.begin(), src.end(), serverID) == src.end()) { - targetTeam.insert(targetTeam.end(), src.begin(), src.end()); - } - } + // for (int i = 0; i < keyServers.size() && teamForDroppedRange.empty(); ++i) { + // decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); + // if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) { + // teamForDroppedRange.insert(teamForDroppedRange.end(), dest.begin(), dest.end()); + // } + // if (!teamForDroppedRange.empty()) { + // break; + // } + // if (std::find(src.begin(), src.end(), serverID) == src.end()) { + // teamForDroppedRange.insert(teamForDroppedRange.end(), src.begin(), src.end()); + // } + // } state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); state int i = 0; @@ -1383,8 +1384,10 @@ ACTOR Future removeKeysFromFailedServer(Database cx, // Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all // dest servers. if (src.empty()) { - ASSERT(!targetTeam.empty()); - tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, targetTeam, {})); + if (teamForDroppedRange.empty()) { + throw internal_error_msg("No team for the dropped range."); + } + tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {})); vector> actors; for (const UID& id : dest) { actors.push_back(krmSetRangeCoalescing(&tr, @@ -1394,7 +1397,7 @@ ACTOR Future removeKeysFromFailedServer(Database cx, serverKeysFalse)); } // Update serverKeys to include keys. - for (const UID& id : targetTeam) { + for (const UID& id : teamForDroppedRange) { actors.push_back(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(id), KeyRangeRef(it.key, keyServers[i + 1].key), @@ -1404,12 +1407,8 @@ ACTOR Future removeKeysFromFailedServer(Database cx, TraceEvent(SevWarn, "FailedServerRemoveRange", serverID) .detail("Key", it.key) .detail("OldDest", describe(dest)) - .detail("NewTeam", describe(targetTeam)); + .detail("NewTeam", describe(teamForDroppedRange)); wait(waitForAll(actors)); - TraceEvent(SevWarn, "FailedServerRemoveRangeEnd", serverID) - .detail("Key", it.key) - .detail("OldDest", describe(dest)) - .detail("NewTeam", describe(targetTeam)); } else { TraceEvent("FailedServerSetKey", serverID) .detail("Key", it.key) diff --git a/fdbserver/MoveKeys.actor.h b/fdbserver/MoveKeys.actor.h index c8092bbcdd..f876b4eacb 100644 --- a/fdbserver/MoveKeys.actor.h +++ b/fdbserver/MoveKeys.actor.h @@ -101,6 +101,7 @@ ACTOR Future canRemoveStorageServer(Reference t // Obviously that could change later! ACTOR Future removeKeysFromFailedServer(Database cx, UID serverID, + std::vector teamForDroppedRange, MoveKeysLock lock, const DDEnabledState* ddEnabledState); // Directly removes serverID from serverKeys and keyServers system keyspace. From e838d63011aea1a03bf28027ceda126d55da7a29 Mon Sep 17 00:00:00 2001 From: helium Date: Tue, 14 Sep 2021 20:57:58 -0700 Subject: [PATCH 11/19] exclude to-be-removed server from target team --- fdbserver/MoveKeys.actor.cpp | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 7550cf8470..b6c54e4033 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1334,18 +1334,19 @@ ACTOR Future removeKeysFromFailedServer(Database cx, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); - // for (int i = 0; i < keyServers.size() && teamForDroppedRange.empty(); ++i) { - // decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); - // if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) { - // teamForDroppedRange.insert(teamForDroppedRange.end(), dest.begin(), dest.end()); - // } - // if (!teamForDroppedRange.empty()) { - // break; - // } - // if (std::find(src.begin(), src.end(), serverID) == src.end()) { - // teamForDroppedRange.insert(teamForDroppedRange.end(), src.begin(), src.end()); - // } - // } + teamForDroppedRange.clear(); + for (int i = 0; i < keyServers.size() && teamForDroppedRange.empty(); ++i) { + decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); + if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) { + teamForDroppedRange.insert(teamForDroppedRange.end(), dest.begin(), dest.end()); + } + if (!teamForDroppedRange.empty()) { + break; + } + if (std::find(src.begin(), src.end(), serverID) == src.end()) { + teamForDroppedRange.insert(teamForDroppedRange.end(), src.begin(), src.end()); + } + } state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); state int i = 0; @@ -1384,9 +1385,7 @@ ACTOR Future removeKeysFromFailedServer(Database cx, // Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all // dest servers. if (src.empty()) { - if (teamForDroppedRange.empty()) { - throw internal_error_msg("No team for the dropped range."); - } + assert(!teamForDroppedRange.empty()); tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {})); vector> actors; for (const UID& id : dest) { From c8a3413820bb342649797b8021e6d69fa737c4c1 Mon Sep 17 00:00:00 2001 From: He Liu Date: Wed, 15 Sep 2021 09:07:50 -0700 Subject: [PATCH 12/19] exclude to-be-dropped server from the random team --- fdbserver/DataDistribution.actor.cpp | 21 ++++++++++++++------- fdbserver/MoveKeys.actor.cpp | 26 +++++++++++++------------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 5a7e0e9471..33a6201f5c 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -357,6 +357,10 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } + bool hasServer(const UID& server) { + return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end(); + } + void addServers(const vector& servers) override { serverIDs.reserve(servers.size()); for (int i = 0; i < servers.size(); i++) { @@ -893,11 +897,11 @@ struct DDTeamCollection : ReferenceCounted { return Void(); } - std::vector getRandomHealthyTeam() { + std::vector getRandomHealthyTeam(const UID& excludeServer) { int count = 0; Optional idx; for (int i = 0; i < teams.size(); ++i) { - if (teams[i]->isHealthy()) { + if (teams[i]->isHealthy() && !teams[i]->hasServer(excludeServer)) { if (std::rand() % ++count == 0) { idx = i; } @@ -6139,11 +6143,14 @@ ACTOR Future dataDistribution(Reference self, state Error err = e; TraceEvent("DataDistributorDestroyTeamCollections").error(e); state std::vector teamForDroppedRange; - std::vector pTeam = primaryTeamCollection->getRandomHealthyTeam(); - teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end()); - if (configuration.usableRegions > 1) { - std::vector rTeam = remoteTeamCollection->getRandomHealthyTeam(); - teamForDroppedRange.insert(teamForDroppedRange.end(), rTeam.begin(), rTeam.end()); + if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) { + const UID serverID = removeFailedServer.getFuture().get(); + std::vector pTeam = primaryTeamCollection->getRandomHealthyTeam(serverID); + teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end()); + if (configuration.usableRegions > 1) { + std::vector rTeam = remoteTeamCollection->getRandomHealthyTeam(serverID); + teamForDroppedRange.insert(teamForDroppedRange.end(), rTeam.begin(), rTeam.end()); + } } self->teamCollection = nullptr; primaryTeamCollection = Reference(); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index b6c54e4033..a6ba7f11df 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1334,19 +1334,19 @@ ACTOR Future removeKeysFromFailedServer(Database cx, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); - teamForDroppedRange.clear(); - for (int i = 0; i < keyServers.size() && teamForDroppedRange.empty(); ++i) { - decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); - if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) { - teamForDroppedRange.insert(teamForDroppedRange.end(), dest.begin(), dest.end()); - } - if (!teamForDroppedRange.empty()) { - break; - } - if (std::find(src.begin(), src.end(), serverID) == src.end()) { - teamForDroppedRange.insert(teamForDroppedRange.end(), src.begin(), src.end()); - } - } + // teamForDroppedRange.clear(); + // for (int i = 0; i < keyServers.size() && teamForDroppedRange.empty(); ++i) { + // decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); + // if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) { + // teamForDroppedRange.insert(teamForDroppedRange.end(), dest.begin(), dest.end()); + // } + // if (!teamForDroppedRange.empty()) { + // break; + // } + // if (std::find(src.begin(), src.end(), serverID) == src.end()) { + // teamForDroppedRange.insert(teamForDroppedRange.end(), src.begin(), src.end()); + // } + // } state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); state int i = 0; From ef7fdc0781c1aa92e0abcac3159563977876304a Mon Sep 17 00:00:00 2001 From: He Liu Date: Wed, 15 Sep 2021 10:32:09 -0700 Subject: [PATCH 13/19] fmt --- fdbserver/DataDistribution.actor.cpp | 2 + fdbserver/MoveKeys.actor.cpp | 60 ++++++++++++++-------------- fdbserver/storageserver.actor.cpp | 4 +- 3 files changed, 34 insertions(+), 32 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 33a6201f5c..c3b7e67a07 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -897,6 +897,7 @@ struct DDTeamCollection : ReferenceCounted { return Void(); } + // Returns a random healthy team, which does not contain excludeServer. std::vector getRandomHealthyTeam(const UID& excludeServer) { int count = 0; Optional idx; @@ -6144,6 +6145,7 @@ ACTOR Future dataDistribution(Reference self, TraceEvent("DataDistributorDestroyTeamCollections").error(e); state std::vector teamForDroppedRange; if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) { + // Choose a random healthy team to host the to-be-dropped range. const UID serverID = removeFailedServer.getFuture().get(); std::vector pTeam = primaryTeamCollection->getRandomHealthyTeam(serverID); teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end()); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index a6ba7f11df..4030f70408 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1301,6 +1301,8 @@ ACTOR Future removeStorageServer(Database cx, } // Remove the server from keyServer list and set serverKeysFalse to the server's serverKeys list. // Changes to keyServer and serverKey must happen symmetrically in a transaction. +// If serverID is the last source server for a shard, the shard will be erased, and then be assigned +// to teamForDroppedRange. ACTOR Future removeKeysFromFailedServer(Database cx, UID serverID, std::vector teamForDroppedRange, @@ -1333,21 +1335,6 @@ ACTOR Future removeKeysFromFailedServer(Database cx, KeyRangeRef(begin, allKeys.end), SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); - - // teamForDroppedRange.clear(); - // for (int i = 0; i < keyServers.size() && teamForDroppedRange.empty(); ++i) { - // decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); - // if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) { - // teamForDroppedRange.insert(teamForDroppedRange.end(), dest.begin(), dest.end()); - // } - // if (!teamForDroppedRange.empty()) { - // break; - // } - // if (std::find(src.begin(), src.end(), serverID) == src.end()) { - // teamForDroppedRange.insert(teamForDroppedRange.end(), src.begin(), src.end()); - // } - // } - state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); state int i = 0; for (; i < keyServers.size() - 1; ++i) { @@ -1360,17 +1347,13 @@ ACTOR Future removeKeysFromFailedServer(Database cx, continue; } - TraceEvent("FailedServerRemoveBegin", serverID) - .detail("Key", it.key) - .detail("ValueSrc", describe(src)) - .detail("ValueDest", describe(dest)); - // Update the vectors to remove failed server then set the value again // Dest is usually empty, but keep this in case there is parallel data movement src.erase(std::remove(src.begin(), src.end(), serverID), src.end()); dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end()); + // If the last src server is to be removed, first check if there are dest servers who is - // hosting a read-write copy of the data, and move such dest servers to the src list. + // hosting a read-write copy of the keyrange, and move such dest servers to the src list. if (src.empty() && !dest.empty()) { std::vector newSources = wait(pickReadWriteServers(&tr, dest, KeyRangeRef(it.key, keyServers[i + 1].key))); @@ -1382,12 +1365,18 @@ ACTOR Future removeKeysFromFailedServer(Database cx, src.push_back(id); } } - // Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all - // dest servers. + + // Move the keyrange to teamForDroppedRange if the src list becomes empty, and also remove the shard + // from all dest servers. if (src.empty()) { assert(!teamForDroppedRange.empty()); + + // Assign the shard to teamFroDroppedRange in keyServer space. tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {})); + vector> actors; + + // Unassign the shard from the dest servers. for (const UID& id : dest) { actors.push_back(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(id), @@ -1395,7 +1384,15 @@ ACTOR Future removeKeysFromFailedServer(Database cx, allKeys, serverKeysFalse)); } - // Update serverKeys to include keys. + if (!dest.empty()) { + TraceEvent(SevWarn, "FailedServerDropRangeFromDest", serverID) + .detail("Begin", it.key) + .detail("End", keyServers[i + 1].key) + .detail("Dest", describe(dest)); + } + + // Assign the shard to the new team as an empty range. + // Note, there could be data loss. for (const UID& id : teamForDroppedRange) { actors.push_back(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(id), @@ -1403,13 +1400,14 @@ ACTOR Future removeKeysFromFailedServer(Database cx, allKeys, serverKeysTrueEmptyRange)); } - TraceEvent(SevWarn, "FailedServerRemoveRange", serverID) - .detail("Key", it.key) - .detail("OldDest", describe(dest)) - .detail("NewTeam", describe(teamForDroppedRange)); + wait(waitForAll(actors)); + TraceEvent(SevWarn, "FailedServerDropRange", serverID) + .detail("Begin", it.key) + .detail("End", keyServers[i + 1].key) + .detail("NewTeam", describe(teamForDroppedRange)); } else { - TraceEvent("FailedServerSetKey", serverID) + TraceEvent(SevDebug, "FailedServerSetKey", serverID) .detail("Key", it.key) .detail("ValueSrc", describe(src)) .detail("ValueDest", describe(dest)); @@ -1418,12 +1416,12 @@ ACTOR Future removeKeysFromFailedServer(Database cx, } // Set entire range for our serverID in serverKeys keyspace to false to signal erasure - TraceEvent("FailedServerSetRange", serverID) + TraceEvent(SevDebug, "FailedServerSetRange", serverID) .detail("Begin", currentKeys.begin) .detail("End", currentKeys.end); wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), currentKeys, allKeys, serverKeysFalse)); wait(tr.commit()); - TraceEvent("FailedServerCommitSuccess", serverID) + TraceEvent(SevDebug, "FailedServerCommitSuccess", serverID) .detail("Begin", currentKeys.begin) .detail("End", currentKeys.end) .detail("CommitVersion", tr.getCommittedVersion()); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 73328b8b1b..36976f5f73 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3478,7 +3478,7 @@ private: bool processedCacheStartKey; void applyPrivateData(StorageServer* data, MutationRef const& m) { - TraceEvent("SSPrivateMutation", data->thisServerID).detail("Mutation", m); + TraceEvent(SevDebug, "SSPrivateMutation", data->thisServerID).detail("Mutation", m); if (processedStartKey) { // Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end) @@ -3494,6 +3494,8 @@ private: // The changes for version have already been received (and are being processed now). We need to fetch // the data for change.version-1 (changes from versions < change.version) + // If emptyRange, treat the shard as empty, see removeKeysFromFailedServer() for more details about this + // scenario. const Version shardVersion = (emptyRange && nowAssigned) ? 0 : currentVersion - 1; changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE); } From 4d5bf08da858c20697dd7d67a55bb11787e8aec0 Mon Sep 17 00:00:00 2001 From: He Liu Date: Sun, 19 Sep 2021 17:03:06 -0700 Subject: [PATCH 14/19] address comments --- fdbserver/DataDistribution.actor.cpp | 4 +++- fdbserver/MoveKeys.actor.cpp | 20 +++++++++----------- fdbserver/storageserver.actor.cpp | 4 +++- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index c3b7e67a07..4b1b962302 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -898,12 +898,14 @@ struct DDTeamCollection : ReferenceCounted { } // Returns a random healthy team, which does not contain excludeServer. + // The simplest reservoir sampling algorithm is used, since efficiency is not a big concern here, + // given that dropping an entire team is considerred a rare event. std::vector getRandomHealthyTeam(const UID& excludeServer) { int count = 0; Optional idx; for (int i = 0; i < teams.size(); ++i) { if (teams[i]->isHealthy() && !teams[i]->hasServer(excludeServer)) { - if (std::rand() % ++count == 0) { + if (deterministicRandom().randomInt(0, ++count) == 0) { idx = i; } } diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 4030f70408..b3093793c2 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1371,7 +1371,7 @@ ACTOR Future removeKeysFromFailedServer(Database cx, if (src.empty()) { assert(!teamForDroppedRange.empty()); - // Assign the shard to teamFroDroppedRange in keyServer space. + // Assign the shard to teamForDroppedRange in keyServer space. tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {})); vector> actors; @@ -1384,12 +1384,6 @@ ACTOR Future removeKeysFromFailedServer(Database cx, allKeys, serverKeysFalse)); } - if (!dest.empty()) { - TraceEvent(SevWarn, "FailedServerDropRangeFromDest", serverID) - .detail("Begin", it.key) - .detail("End", keyServers[i + 1].key) - .detail("Dest", describe(dest)); - } // Assign the shard to the new team as an empty range. // Note, there could be data loss. @@ -1402,10 +1396,14 @@ ACTOR Future removeKeysFromFailedServer(Database cx, } wait(waitForAll(actors)); - TraceEvent(SevWarn, "FailedServerDropRange", serverID) - .detail("Begin", it.key) - .detail("End", keyServers[i + 1].key) - .detail("NewTeam", describe(teamForDroppedRange)); + + TraceEvent trace(SevWarnAlways, "ShardLossAllReplicasDropShard", serverID); + trace.detail("Begin", it.key); + trace.detail("End", keyServers[i + 1].key); + if (!dest.empty()) { + trace.detail("DropedDest", describe(dest)); + } + trace.detail("NewTeamForDroppedShard", describe(teamForDroppedRange)); } else { TraceEvent(SevDebug, "FailedServerSetKey", serverID) .detail("Key", it.key) diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 36976f5f73..374a5efc83 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3320,7 +3320,9 @@ void changeServerKeys(StorageServer* data, } else if (!dataAvailable) { // SOMEDAY: Avoid restarting adding/transferred shards if (version == 0) { // bypass fetchkeys; shard is known empty at version 0 - TraceEvent("ChangeServerKeysEmptyRange", data->thisServerID); + TraceEvent("ChangeServerKeysAddEmptyRange", data->thisServerID) + .detail("Begin", range.begin) + .detail("End", range.end); changeNewestAvailable.emplace_back(range, latestVersion); data->addShard(ShardInfo::newReadWrite(range, data)); setAvailableStatus(data, range, true); From 2246a0bee755fccdd2ca525465122ef97cba4d04 Mon Sep 17 00:00:00 2001 From: He Liu Date: Mon, 20 Sep 2021 11:23:54 -0700 Subject: [PATCH 15/19] switch to plain random selection --- fdbserver/DataDistribution.actor.cpp | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 4b1b962302..85b2576c63 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -898,21 +898,31 @@ struct DDTeamCollection : ReferenceCounted { } // Returns a random healthy team, which does not contain excludeServer. - // The simplest reservoir sampling algorithm is used, since efficiency is not a big concern here, - // given that dropping an entire team is considerred a rare event. std::vector getRandomHealthyTeam(const UID& excludeServer) { - int count = 0; - Optional idx; + std::vector candidates, backup; for (int i = 0; i < teams.size(); ++i) { if (teams[i]->isHealthy() && !teams[i]->hasServer(excludeServer)) { - if (deterministicRandom().randomInt(0, ++count) == 0) { - idx = i; - } + candidates.push_back(i); + } else if (teams[i]->size() - (teams[i]->hasServer(excludeServer) ? 1 : 0) > 0) { + // If a team has at least one other server besides excludeServer, select it + // as a backup candidate. + backup.push_back(i); } } - if (idx.present()) { - return teams[idx.get()]->getServerIDs(); + + // Prefer a healthy team not containing excludeServer. + if (candidates.size() > 0) { + return teams[deterministicRandom()->randomInt(0, candidates.size())]->getServerIDs(); } + + // The backup choice is a team with at least one server besides excludeServer, in this + // case, the team will be possibily relocated to a healthy destination later by DD. + if (backup.size() > 0) { + std::vector res = teams[deterministicRandom()->randomInt(0, backup.size())]->getServerIDs(); + std::remove(res.begin(), res.end(), excludeServer); + return res; + } + return std::vector(); } From b2afa6e2f5941d42721bf2ac718fb522b8a3d580 Mon Sep 17 00:00:00 2001 From: He Liu Date: Mon, 20 Sep 2021 11:53:27 -0700 Subject: [PATCH 16/19] throw an error is no team can be found for a dropped range --- fdbserver/MoveKeys.actor.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index b3093793c2..dff9a1555e 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1369,7 +1369,12 @@ ACTOR Future removeKeysFromFailedServer(Database cx, // Move the keyrange to teamForDroppedRange if the src list becomes empty, and also remove the shard // from all dest servers. if (src.empty()) { - assert(!teamForDroppedRange.empty()); + if (teamForDroppedRange.empty()) { + TraceEvent(SevError, "ShardLossAllReplicasNoDestinationTeam", serverID) + .detail("Begin", it.key) + .detail("End", keyServers[i + 1].key); + throw internal_error(); + } // Assign the shard to teamForDroppedRange in keyServer space. tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {})); From 95f64f97de2af7e4194ba06e7dd7e17c3d29838e Mon Sep 17 00:00:00 2001 From: He Liu Date: Mon, 20 Sep 2021 11:58:30 -0700 Subject: [PATCH 17/19] resolved some fmt issues --- fdbclient/SystemData.cpp | 6 +++--- fdbserver/MoveKeys.actor.cpp | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index dedcc26588..e8f2c21333 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -273,9 +273,9 @@ std::pair>, vector> addReadWriteDestinations(KeyRangeRef shard, // Returns storage servers selected from 'candidates', who is serving a read-write copy of 'range'. ACTOR Future> pickReadWriteServers(Transaction* tr, std::vector candidates, KeyRangeRef range) { - vector>> serverListEntries; + std::vector>> serverListEntries; for (const UID id : candidates) { serverListEntries.push_back(tr->get(serverListKeyFor(id))); } - vector> serverListValues = wait(getAll(serverListEntries)); + std::vector> serverListValues = wait(getAll(serverListEntries)); std::vector ssis; for (auto& v : serverListValues) { From 914b1ee960ed3616a2b054f2b044c803c9a17f91 Mon Sep 17 00:00:00 2001 From: He Liu Date: Mon, 20 Sep 2021 12:01:47 -0700 Subject: [PATCH 18/19] fmt --- fdbclient/SystemData.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index e8f2c21333..7ddaba4a26 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -273,9 +273,9 @@ std::pair>, vector Date: Wed, 22 Sep 2021 16:55:49 -0700 Subject: [PATCH 19/19] merge fix --- fdbserver/MoveKeys.actor.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 3342c6568b..a7b1899152 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -265,7 +265,7 @@ ACTOR Future> pickReadWriteServers(Transaction* tr, std::vector wait(waitForAll(checks)); - vector result; + std::vector result; for (const auto& it : checks) { if (it.get().present()) { result.push_back(it.get().get()); @@ -1308,11 +1308,10 @@ ACTOR Future removeKeysFromFailedServer(Database cx, std::vector teamForDroppedRange, MoveKeysLock lock, const DDEnabledState* ddEnabledState) { - // state std::vector teamForDroppedRange; state Key begin = allKeys.begin; - state vector src; - state vector dest; + state std::vector src; + state std::vector dest; // Multi-transactional removal in case of large number of shards, concern in violating 5s transaction limit while (begin < allKeys.end) { state Transaction tr(cx); @@ -1379,7 +1378,7 @@ ACTOR Future removeKeysFromFailedServer(Database cx, // Assign the shard to teamForDroppedRange in keyServer space. tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {})); - vector> actors; + std::vector> actors; // Unassign the shard from the dest servers. for (const UID& id : dest) {