Merge pull request #5524 from liquid-helium/improve-exlude-failed-servers
Check if the src server list will be empty before removing a failed SS
This commit is contained in:
commit
c99ce19ad7
|
@ -273,8 +273,9 @@ std::pair<std::vector<std::pair<UID, NetworkAddress>>, std::vector<std::pair<UID
|
|||
return std::make_pair(logs, oldLogs);
|
||||
}
|
||||
|
||||
const KeyRef serverKeysPrefix = LiteralStringRef("\xff/serverKeys/");
|
||||
const ValueRef serverKeysTrue = LiteralStringRef("1"), // compatible with what was serverKeysTrue
|
||||
const KeyRef serverKeysPrefix = "\xff/serverKeys/"_sr;
|
||||
const ValueRef serverKeysTrue = "1"_sr, // compatible with what was serverKeysTrue
|
||||
serverKeysTrueEmptyRange = "3"_sr, // the server treats the range as empty.
|
||||
serverKeysFalse;
|
||||
|
||||
const Key serverKeysKey(UID serverID, const KeyRef& key) {
|
||||
|
@ -299,7 +300,7 @@ UID serverKeysDecodeServer(const KeyRef& key) {
|
|||
return server_id;
|
||||
}
|
||||
bool serverHasKey(ValueRef storedValue) {
|
||||
return storedValue == serverKeysTrue;
|
||||
return storedValue == serverKeysTrue || storedValue == serverKeysTrueEmptyRange;
|
||||
}
|
||||
|
||||
const KeyRef cacheKeysPrefix = LiteralStringRef("\xff\x02/cacheKeys/");
|
||||
|
|
|
@ -89,7 +89,7 @@ void decodeStorageCacheValue(const ValueRef& value, std::vector<uint16_t>& serve
|
|||
// as the key, the value indicates whether the shard does or does not exist on the server.
|
||||
// These values can be changed as data movement occurs.
|
||||
extern const KeyRef serverKeysPrefix;
|
||||
extern const ValueRef serverKeysTrue, serverKeysFalse;
|
||||
extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse;
|
||||
const Key serverKeysKey(UID serverID, const KeyRef& keys);
|
||||
const Key serverKeysPrefixFor(UID serverID);
|
||||
UID serverKeysDecodeServer(const KeyRef& key);
|
||||
|
|
|
@ -357,6 +357,10 @@ public:
|
|||
void addref() override { ReferenceCounted<TCTeamInfo>::addref(); }
|
||||
void delref() override { ReferenceCounted<TCTeamInfo>::delref(); }
|
||||
|
||||
bool hasServer(const UID& server) {
|
||||
return std::find(serverIDs.begin(), serverIDs.end(), server) != serverIDs.end();
|
||||
}
|
||||
|
||||
void addServers(const std::vector<UID>& servers) override {
|
||||
serverIDs.reserve(servers.size());
|
||||
for (int i = 0; i < servers.size(); i++) {
|
||||
|
@ -893,6 +897,35 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Returns a random healthy team, which does not contain excludeServer.
|
||||
std::vector<UID> getRandomHealthyTeam(const UID& excludeServer) {
|
||||
std::vector<int> candidates, backup;
|
||||
for (int i = 0; i < teams.size(); ++i) {
|
||||
if (teams[i]->isHealthy() && !teams[i]->hasServer(excludeServer)) {
|
||||
candidates.push_back(i);
|
||||
} else if (teams[i]->size() - (teams[i]->hasServer(excludeServer) ? 1 : 0) > 0) {
|
||||
// If a team has at least one other server besides excludeServer, select it
|
||||
// as a backup candidate.
|
||||
backup.push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Prefer a healthy team not containing excludeServer.
|
||||
if (candidates.size() > 0) {
|
||||
return teams[deterministicRandom()->randomInt(0, candidates.size())]->getServerIDs();
|
||||
}
|
||||
|
||||
// The backup choice is a team with at least one server besides excludeServer, in this
|
||||
// case, the team will be possibily relocated to a healthy destination later by DD.
|
||||
if (backup.size() > 0) {
|
||||
std::vector<UID> res = teams[deterministicRandom()->randomInt(0, backup.size())]->getServerIDs();
|
||||
std::remove(res.begin(), res.end(), excludeServer);
|
||||
return res;
|
||||
}
|
||||
|
||||
return std::vector<UID>();
|
||||
}
|
||||
|
||||
// SOMEDAY: Make bestTeam better about deciding to leave a shard where it is (e.g. in PRIORITY_TEAM_HEALTHY case)
|
||||
// use keys, src, dest, metrics, priority, system load, etc.. to decide...
|
||||
ACTOR static Future<Void> getTeam(DDTeamCollection* self, GetTeamRequest req) {
|
||||
|
@ -6152,6 +6185,17 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
trackerCancelled = true;
|
||||
state Error err = e;
|
||||
TraceEvent("DataDistributorDestroyTeamCollections").error(e);
|
||||
state std::vector<UID> teamForDroppedRange;
|
||||
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
|
||||
// Choose a random healthy team to host the to-be-dropped range.
|
||||
const UID serverID = removeFailedServer.getFuture().get();
|
||||
std::vector<UID> pTeam = primaryTeamCollection->getRandomHealthyTeam(serverID);
|
||||
teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end());
|
||||
if (configuration.usableRegions > 1) {
|
||||
std::vector<UID> rTeam = remoteTeamCollection->getRandomHealthyTeam(serverID);
|
||||
teamForDroppedRange.insert(teamForDroppedRange.end(), rTeam.begin(), rTeam.end());
|
||||
}
|
||||
}
|
||||
self->teamCollection = nullptr;
|
||||
primaryTeamCollection = Reference<DDTeamCollection>();
|
||||
remoteTeamCollection = Reference<DDTeamCollection>();
|
||||
|
@ -6159,7 +6203,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
TraceEvent("DataDistributorTeamCollectionsDestroyed").error(err);
|
||||
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
|
||||
TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err);
|
||||
wait(removeKeysFromFailedServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState));
|
||||
wait(removeKeysFromFailedServer(
|
||||
cx, removeFailedServer.getFuture().get(), teamForDroppedRange, lock, ddEnabledState));
|
||||
Optional<UID> tssPairID;
|
||||
wait(removeStorageServer(cx, removeFailedServer.getFuture().get(), tssPairID, lock, ddEnabledState));
|
||||
} else {
|
||||
|
|
|
@ -236,6 +236,45 @@ ACTOR Future<std::vector<UID>> addReadWriteDestinations(KeyRangeRef shard,
|
|||
return result;
|
||||
}
|
||||
|
||||
// Returns storage servers selected from 'candidates', who is serving a read-write copy of 'range'.
|
||||
ACTOR Future<std::vector<UID>> pickReadWriteServers(Transaction* tr, std::vector<UID> candidates, KeyRangeRef range) {
|
||||
std::vector<Future<Optional<Value>>> serverListEntries;
|
||||
|
||||
for (const UID id : candidates) {
|
||||
serverListEntries.push_back(tr->get(serverListKeyFor(id)));
|
||||
}
|
||||
|
||||
std::vector<Optional<Value>> serverListValues = wait(getAll(serverListEntries));
|
||||
|
||||
std::vector<StorageServerInterface> ssis;
|
||||
for (auto& v : serverListValues) {
|
||||
ssis.push_back(decodeServerListValue(v.get()));
|
||||
}
|
||||
|
||||
state std::vector<Future<Optional<UID>>> checks;
|
||||
checks.reserve(ssis.size());
|
||||
for (auto& ssi : ssis) {
|
||||
checks.push_back(checkReadWrite(
|
||||
ssi.getShardState.getReplyUnlessFailedFor(GetShardStateRequest(range, GetShardStateRequest::NO_WAIT),
|
||||
SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL,
|
||||
0,
|
||||
TaskPriority::MoveKeys),
|
||||
ssi.id(),
|
||||
0));
|
||||
}
|
||||
|
||||
wait(waitForAll(checks));
|
||||
|
||||
std::vector<UID> result;
|
||||
for (const auto& it : checks) {
|
||||
if (it.get().present()) {
|
||||
result.push_back(it.get().get());
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<std::vector<UID>>> additionalSources(RangeResult shards,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
int desiredHealthy,
|
||||
|
@ -1262,11 +1301,17 @@ ACTOR Future<Void> removeStorageServer(Database cx,
|
|||
}
|
||||
// Remove the server from keyServer list and set serverKeysFalse to the server's serverKeys list.
|
||||
// Changes to keyServer and serverKey must happen symmetrically in a transaction.
|
||||
// If serverID is the last source server for a shard, the shard will be erased, and then be assigned
|
||||
// to teamForDroppedRange.
|
||||
ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
||||
UID serverID,
|
||||
std::vector<UID> teamForDroppedRange,
|
||||
MoveKeysLock lock,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
state Key begin = allKeys.begin;
|
||||
|
||||
state std::vector<UID> src;
|
||||
state std::vector<UID> dest;
|
||||
// Multi-transactional removal in case of large number of shards, concern in violating 5s transaction limit
|
||||
while (begin < allKeys.end) {
|
||||
state Transaction tr(cx);
|
||||
|
@ -1290,10 +1335,9 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
||||
state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key);
|
||||
for (int i = 0; i < keyServers.size() - 1; ++i) {
|
||||
auto it = keyServers[i];
|
||||
std::vector<UID> src;
|
||||
std::vector<UID> dest;
|
||||
state int i = 0;
|
||||
for (; i < keyServers.size() - 1; ++i) {
|
||||
state KeyValueRef it = keyServers[i];
|
||||
decodeKeyServersValue(UIDtoTagMap, it.value, src, dest);
|
||||
|
||||
// The failed server is not present
|
||||
|
@ -1306,11 +1350,71 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
// Dest is usually empty, but keep this in case there is parallel data movement
|
||||
src.erase(std::remove(src.begin(), src.end(), serverID), src.end());
|
||||
dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end());
|
||||
TraceEvent(SevDebug, "FailedServerSetKey", serverID)
|
||||
.detail("Key", it.key)
|
||||
.detail("ValueSrc", describe(src))
|
||||
.detail("ValueDest", describe(dest));
|
||||
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest));
|
||||
|
||||
// If the last src server is to be removed, first check if there are dest servers who is
|
||||
// hosting a read-write copy of the keyrange, and move such dest servers to the src list.
|
||||
if (src.empty() && !dest.empty()) {
|
||||
std::vector<UID> newSources =
|
||||
wait(pickReadWriteServers(&tr, dest, KeyRangeRef(it.key, keyServers[i + 1].key)));
|
||||
for (const UID& id : newSources) {
|
||||
TraceEvent(SevWarn, "FailedServerAdditionalSourceServer", serverID)
|
||||
.detail("Key", it.key)
|
||||
.detail("NewSourceServerFromDest", id);
|
||||
dest.erase(std::remove(dest.begin(), dest.end(), id), dest.end());
|
||||
src.push_back(id);
|
||||
}
|
||||
}
|
||||
|
||||
// Move the keyrange to teamForDroppedRange if the src list becomes empty, and also remove the shard
|
||||
// from all dest servers.
|
||||
if (src.empty()) {
|
||||
if (teamForDroppedRange.empty()) {
|
||||
TraceEvent(SevError, "ShardLossAllReplicasNoDestinationTeam", serverID)
|
||||
.detail("Begin", it.key)
|
||||
.detail("End", keyServers[i + 1].key);
|
||||
throw internal_error();
|
||||
}
|
||||
|
||||
// Assign the shard to teamForDroppedRange in keyServer space.
|
||||
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {}));
|
||||
|
||||
std::vector<Future<Void>> actors;
|
||||
|
||||
// Unassign the shard from the dest servers.
|
||||
for (const UID& id : dest) {
|
||||
actors.push_back(krmSetRangeCoalescing(&tr,
|
||||
serverKeysPrefixFor(id),
|
||||
KeyRangeRef(it.key, keyServers[i + 1].key),
|
||||
allKeys,
|
||||
serverKeysFalse));
|
||||
}
|
||||
|
||||
// Assign the shard to the new team as an empty range.
|
||||
// Note, there could be data loss.
|
||||
for (const UID& id : teamForDroppedRange) {
|
||||
actors.push_back(krmSetRangeCoalescing(&tr,
|
||||
serverKeysPrefixFor(id),
|
||||
KeyRangeRef(it.key, keyServers[i + 1].key),
|
||||
allKeys,
|
||||
serverKeysTrueEmptyRange));
|
||||
}
|
||||
|
||||
wait(waitForAll(actors));
|
||||
|
||||
TraceEvent trace(SevWarnAlways, "ShardLossAllReplicasDropShard", serverID);
|
||||
trace.detail("Begin", it.key);
|
||||
trace.detail("End", keyServers[i + 1].key);
|
||||
if (!dest.empty()) {
|
||||
trace.detail("DropedDest", describe(dest));
|
||||
}
|
||||
trace.detail("NewTeamForDroppedShard", describe(teamForDroppedRange));
|
||||
} else {
|
||||
TraceEvent(SevDebug, "FailedServerSetKey", serverID)
|
||||
.detail("Key", it.key)
|
||||
.detail("ValueSrc", describe(src))
|
||||
.detail("ValueDest", describe(dest));
|
||||
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, src, dest));
|
||||
}
|
||||
}
|
||||
|
||||
// Set entire range for our serverID in serverKeys keyspace to false to signal erasure
|
||||
|
|
|
@ -101,6 +101,7 @@ ACTOR Future<bool> canRemoveStorageServer(Reference<ReadYourWritesTransaction> t
|
|||
// Obviously that could change later!
|
||||
ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
||||
UID serverID,
|
||||
std::vector<UID> teamForDroppedRange,
|
||||
MoveKeysLock lock,
|
||||
const DDEnabledState* ddEnabledState);
|
||||
// Directly removes serverID from serverKeys and keyServers system keyspace.
|
||||
|
|
|
@ -3322,6 +3322,9 @@ void changeServerKeys(StorageServer* data,
|
|||
} else if (!dataAvailable) {
|
||||
// SOMEDAY: Avoid restarting adding/transferred shards
|
||||
if (version == 0) { // bypass fetchkeys; shard is known empty at version 0
|
||||
TraceEvent("ChangeServerKeysAddEmptyRange", data->thisServerID)
|
||||
.detail("Begin", range.begin)
|
||||
.detail("End", range.end);
|
||||
changeNewestAvailable.emplace_back(range, latestVersion);
|
||||
data->addShard(ShardInfo::newReadWrite(range, data));
|
||||
setAvailableStatus(data, range, true);
|
||||
|
@ -3472,6 +3475,7 @@ private:
|
|||
|
||||
KeyRef startKey;
|
||||
bool nowAssigned;
|
||||
bool emptyRange;
|
||||
bool processedStartKey;
|
||||
|
||||
KeyRef cacheStartKey;
|
||||
|
@ -3494,7 +3498,10 @@ private:
|
|||
|
||||
// The changes for version have already been received (and are being processed now). We need to fetch
|
||||
// the data for change.version-1 (changes from versions < change.version)
|
||||
changeServerKeys(data, keys, nowAssigned, currentVersion - 1, CSK_UPDATE);
|
||||
// If emptyRange, treat the shard as empty, see removeKeysFromFailedServer() for more details about this
|
||||
// scenario.
|
||||
const Version shardVersion = (emptyRange && nowAssigned) ? 0 : currentVersion - 1;
|
||||
changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE);
|
||||
}
|
||||
|
||||
processedStartKey = false;
|
||||
|
@ -3504,6 +3511,7 @@ private:
|
|||
// keys
|
||||
startKey = m.param1;
|
||||
nowAssigned = m.param2 != serverKeysFalse;
|
||||
emptyRange = m.param2 == serverKeysTrueEmptyRange;
|
||||
processedStartKey = true;
|
||||
} else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) {
|
||||
// lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)
|
||||
|
|
Loading…
Reference in New Issue