diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 6a1b52fda9..c5e925e7c8 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -273,8 +273,9 @@ std::pair>, std::vector& serve // as the key, the value indicates whether the shard does or does not exist on the server. // These values can be changed as data movement occurs. extern const KeyRef serverKeysPrefix; -extern const ValueRef serverKeysTrue, serverKeysFalse; +extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse; const Key serverKeysKey(UID serverID, const KeyRef& keys); const Key serverKeysPrefixFor(UID serverID); UID serverKeysDecodeServer(const KeyRef& key); diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 81e4fe9a2d..67dc883fa6 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -357,6 +357,10 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } + bool hasServer(const UID& server) { + return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end(); + } + void addServers(const std::vector& servers) override { serverIDs.reserve(servers.size()); for (int i = 0; i < servers.size(); i++) { @@ -893,6 +897,35 @@ struct DDTeamCollection : ReferenceCounted { return Void(); } + // Returns a random healthy team, which does not contain excludeServer. + std::vector getRandomHealthyTeam(const UID& excludeServer) { + std::vector candidates, backup; + for (int i = 0; i < teams.size(); ++i) { + if (teams[i]->isHealthy() && !teams[i]->hasServer(excludeServer)) { + candidates.push_back(i); + } else if (teams[i]->size() - (teams[i]->hasServer(excludeServer) ? 1 : 0) > 0) { + // If a team has at least one other server besides excludeServer, select it + // as a backup candidate. + backup.push_back(i); + } + } + + // Prefer a healthy team not containing excludeServer. + if (candidates.size() > 0) { + return teams[deterministicRandom()->randomInt(0, candidates.size())]->getServerIDs(); + } + + // The backup choice is a team with at least one server besides excludeServer, in this + // case, the team will be possibily relocated to a healthy destination later by DD. + if (backup.size() > 0) { + std::vector res = teams[deterministicRandom()->randomInt(0, backup.size())]->getServerIDs(); + std::remove(res.begin(), res.end(), excludeServer); + return res; + } + + return std::vector(); + } + // SOMEDAY: Make bestTeam better about deciding to leave a shard where it is (e.g. in PRIORITY_TEAM_HEALTHY case) // use keys, src, dest, metrics, priority, system load, etc.. to decide... ACTOR static Future getTeam(DDTeamCollection* self, GetTeamRequest req) { @@ -6152,6 +6185,17 @@ ACTOR Future dataDistribution(Reference self, trackerCancelled = true; state Error err = e; TraceEvent("DataDistributorDestroyTeamCollections").error(e); + state std::vector teamForDroppedRange; + if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) { + // Choose a random healthy team to host the to-be-dropped range. + const UID serverID = removeFailedServer.getFuture().get(); + std::vector pTeam = primaryTeamCollection->getRandomHealthyTeam(serverID); + teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end()); + if (configuration.usableRegions > 1) { + std::vector rTeam = remoteTeamCollection->getRandomHealthyTeam(serverID); + teamForDroppedRange.insert(teamForDroppedRange.end(), rTeam.begin(), rTeam.end()); + } + } self->teamCollection = nullptr; primaryTeamCollection = Reference(); remoteTeamCollection = Reference(); @@ -6159,7 +6203,8 @@ ACTOR Future dataDistribution(Reference self, TraceEvent("DataDistributorTeamCollectionsDestroyed").error(err); if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) { TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err); - wait(removeKeysFromFailedServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState)); + wait(removeKeysFromFailedServer( + cx, removeFailedServer.getFuture().get(), teamForDroppedRange, lock, ddEnabledState)); Optional tssPairID; wait(removeStorageServer(cx, removeFailedServer.getFuture().get(), tssPairID, lock, ddEnabledState)); } else { diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 4eff94d4ae..a7b1899152 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -236,6 +236,45 @@ ACTOR Future> addReadWriteDestinations(KeyRangeRef shard, return result; } +// Returns storage servers selected from 'candidates', who is serving a read-write copy of 'range'. +ACTOR Future> pickReadWriteServers(Transaction* tr, std::vector candidates, KeyRangeRef range) { + std::vector>> serverListEntries; + + for (const UID id : candidates) { + serverListEntries.push_back(tr->get(serverListKeyFor(id))); + } + + std::vector> serverListValues = wait(getAll(serverListEntries)); + + std::vector ssis; + for (auto& v : serverListValues) { + ssis.push_back(decodeServerListValue(v.get())); + } + + state std::vector>> checks; + checks.reserve(ssis.size()); + for (auto& ssi : ssis) { + checks.push_back(checkReadWrite( + ssi.getShardState.getReplyUnlessFailedFor(GetShardStateRequest(range, GetShardStateRequest::NO_WAIT), + SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, + 0, + TaskPriority::MoveKeys), + ssi.id(), + 0)); + } + + wait(waitForAll(checks)); + + std::vector result; + for (const auto& it : checks) { + if (it.get().present()) { + result.push_back(it.get().get()); + } + } + + return result; +} + ACTOR Future>> additionalSources(RangeResult shards, Reference tr, int desiredHealthy, @@ -1262,11 +1301,17 @@ ACTOR Future removeStorageServer(Database cx, } // Remove the server from keyServer list and set serverKeysFalse to the server's serverKeys list. // Changes to keyServer and serverKey must happen symmetrically in a transaction. +// If serverID is the last source server for a shard, the shard will be erased, and then be assigned +// to teamForDroppedRange. ACTOR Future removeKeysFromFailedServer(Database cx, UID serverID, + std::vector teamForDroppedRange, MoveKeysLock lock, const DDEnabledState* ddEnabledState) { state Key begin = allKeys.begin; + + state std::vector src; + state std::vector dest; // Multi-transactional removal in case of large number of shards, concern in violating 5s transaction limit while (begin < allKeys.end) { state Transaction tr(cx); @@ -1290,10 +1335,9 @@ ACTOR Future removeKeysFromFailedServer(Database cx, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); - for (int i = 0; i < keyServers.size() - 1; ++i) { - auto it = keyServers[i]; - std::vector src; - std::vector dest; + state int i = 0; + for (; i < keyServers.size() - 1; ++i) { + state KeyValueRef it = keyServers[i]; decodeKeyServersValue(UIDtoTagMap, it.value, src, dest); // The failed server is not present @@ -1306,11 +1350,71 @@ ACTOR Future removeKeysFromFailedServer(Database cx, // Dest is usually empty, but keep this in case there is parallel data movement src.erase(std::remove(src.begin(), src.end(), serverID), src.end()); dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end()); - TraceEvent(SevDebug, "FailedServerSetKey", serverID) - .detail("Key", it.key) - .detail("ValueSrc", describe(src)) - .detail("ValueDest", describe(dest)); - tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest)); + + // If the last src server is to be removed, first check if there are dest servers who is + // hosting a read-write copy of the keyrange, and move such dest servers to the src list. + if (src.empty() && !dest.empty()) { + std::vector newSources = + wait(pickReadWriteServers(&tr, dest, KeyRangeRef(it.key, keyServers[i + 1].key))); + for (const UID& id : newSources) { + TraceEvent(SevWarn, "FailedServerAdditionalSourceServer", serverID) + .detail("Key", it.key) + .detail("NewSourceServerFromDest", id); + dest.erase(std::remove(dest.begin(), dest.end(), id), dest.end()); + src.push_back(id); + } + } + + // Move the keyrange to teamForDroppedRange if the src list becomes empty, and also remove the shard + // from all dest servers. + if (src.empty()) { + if (teamForDroppedRange.empty()) { + TraceEvent(SevError, "ShardLossAllReplicasNoDestinationTeam", serverID) + .detail("Begin", it.key) + .detail("End", keyServers[i + 1].key); + throw internal_error(); + } + + // Assign the shard to teamForDroppedRange in keyServer space. + tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {})); + + std::vector> actors; + + // Unassign the shard from the dest servers. + for (const UID& id : dest) { + actors.push_back(krmSetRangeCoalescing(&tr, + serverKeysPrefixFor(id), + KeyRangeRef(it.key, keyServers[i + 1].key), + allKeys, + serverKeysFalse)); + } + + // Assign the shard to the new team as an empty range. + // Note, there could be data loss. + for (const UID& id : teamForDroppedRange) { + actors.push_back(krmSetRangeCoalescing(&tr, + serverKeysPrefixFor(id), + KeyRangeRef(it.key, keyServers[i + 1].key), + allKeys, + serverKeysTrueEmptyRange)); + } + + wait(waitForAll(actors)); + + TraceEvent trace(SevWarnAlways, "ShardLossAllReplicasDropShard", serverID); + trace.detail("Begin", it.key); + trace.detail("End", keyServers[i + 1].key); + if (!dest.empty()) { + trace.detail("DropedDest", describe(dest)); + } + trace.detail("NewTeamForDroppedShard", describe(teamForDroppedRange)); + } else { + TraceEvent(SevDebug, "FailedServerSetKey", serverID) + .detail("Key", it.key) + .detail("ValueSrc", describe(src)) + .detail("ValueDest", describe(dest)); + tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest)); + } } // Set entire range for our serverID in serverKeys keyspace to false to signal erasure diff --git a/fdbserver/MoveKeys.actor.h b/fdbserver/MoveKeys.actor.h index ded491d006..220a940f25 100644 --- a/fdbserver/MoveKeys.actor.h +++ b/fdbserver/MoveKeys.actor.h @@ -101,6 +101,7 @@ ACTOR Future canRemoveStorageServer(Reference t // Obviously that could change later! ACTOR Future removeKeysFromFailedServer(Database cx, UID serverID, + std::vector teamForDroppedRange, MoveKeysLock lock, const DDEnabledState* ddEnabledState); // Directly removes serverID from serverKeys and keyServers system keyspace. diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 697019e50d..742563e8f5 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -3322,6 +3322,9 @@ void changeServerKeys(StorageServer* data, } else if (!dataAvailable) { // SOMEDAY: Avoid restarting adding/transferred shards if (version == 0) { // bypass fetchkeys; shard is known empty at version 0 + TraceEvent("ChangeServerKeysAddEmptyRange", data->thisServerID) + .detail("Begin", range.begin) + .detail("End", range.end); changeNewestAvailable.emplace_back(range, latestVersion); data->addShard(ShardInfo::newReadWrite(range, data)); setAvailableStatus(data, range, true); @@ -3472,6 +3475,7 @@ private: KeyRef startKey; bool nowAssigned; + bool emptyRange; bool processedStartKey; KeyRef cacheStartKey; @@ -3494,7 +3498,10 @@ private: // The changes for version have already been received (and are being processed now). We need to fetch // the data for change.version-1 (changes from versions < change.version) - changeServerKeys(data, keys, nowAssigned, currentVersion - 1, CSK_UPDATE); + // If emptyRange, treat the shard as empty, see removeKeysFromFailedServer() for more details about this + // scenario. + const Version shardVersion = (emptyRange && nowAssigned) ? 0 : currentVersion - 1; + changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE); } processedStartKey = false; @@ -3504,6 +3511,7 @@ private: // keys startKey = m.param1; nowAssigned = m.param2 != serverKeysFalse; + emptyRange = m.param2 == serverKeysTrueEmptyRange; processedStartKey = true; } else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) { // lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)