choose team before removing server
This commit is contained in:
parent
7e53f8662d
commit
fd6d088945
|
@ -893,9 +893,22 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
return Void();
|
||||
}
|
||||
|
||||
Optional<TCTeamInfo> getRandomHealthyTeam() {
|
||||
|
||||
std::vector<UID> getRandomHealthyTeam() {
|
||||
int count = 0;
|
||||
Optional<int> idx;
|
||||
for (int i = 0; i < teams.size(); ++i) {
|
||||
if (teams[i]->isHealthy()) {
|
||||
if (std::rand() % ++count == 0) {
|
||||
idx = i;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (idx.present()) {
|
||||
return teams[idx.get()]->getServerIDs();
|
||||
}
|
||||
return std::vector<UID>();
|
||||
}
|
||||
|
||||
// SOMEDAY: Make bestTeam better about deciding to leave a shard where it is (e.g. in PRIORITY_TEAM_HEALTHY case)
|
||||
// use keys, src, dest, metrics, priority, system load, etc.. to decide...
|
||||
ACTOR static Future<Void> getTeam(DDTeamCollection* self, GetTeamRequest req) {
|
||||
|
@ -6125,6 +6138,13 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
trackerCancelled = true;
|
||||
state Error err = e;
|
||||
TraceEvent("DataDistributorDestroyTeamCollections").error(e);
|
||||
state std::vector<UID> teamForDroppedRange;
|
||||
std::vector<UID> pTeam = primaryTeamCollection->getRandomHealthyTeam();
|
||||
teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end());
|
||||
if (configuration.usableRegions > 1) {
|
||||
std::vector<UID> rTeam = remoteTeamCollection->getRandomHealthyTeam();
|
||||
teamForDroppedRange.insert(teamForDroppedRange.end(), rTeam.begin(), rTeam.end());
|
||||
}
|
||||
self->teamCollection = nullptr;
|
||||
primaryTeamCollection = Reference<DDTeamCollection>();
|
||||
remoteTeamCollection = Reference<DDTeamCollection>();
|
||||
|
@ -6132,7 +6152,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
TraceEvent("DataDistributorTeamCollectionsDestroyed").error(err);
|
||||
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
|
||||
TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err);
|
||||
wait(removeKeysFromFailedServer(cx, removeFailedServer.getFuture().get(), lock, ddEnabledState));
|
||||
wait(removeKeysFromFailedServer(
|
||||
cx, removeFailedServer.getFuture().get(), teamForDroppedRange, lock, ddEnabledState));
|
||||
Optional<UID> tssPairID;
|
||||
wait(removeStorageServer(cx, removeFailedServer.getFuture().get(), tssPairID, lock, ddEnabledState));
|
||||
} else {
|
||||
|
|
|
@ -1303,9 +1303,10 @@ ACTOR Future<Void> removeStorageServer(Database cx,
|
|||
// Changes to keyServer and serverKey must happen symmetrically in a transaction.
|
||||
ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
||||
UID serverID,
|
||||
std::vector<UID> teamForDroppedRange,
|
||||
MoveKeysLock lock,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
state std::vector<UID> targetTeam;
|
||||
// state std::vector<UID> teamForDroppedRange;
|
||||
state Key begin = allKeys.begin;
|
||||
|
||||
state vector<UID> src;
|
||||
|
@ -1333,18 +1334,18 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
||||
|
||||
for (int i = 0; i < keyServers.size() && targetTeam.empty(); ++i) {
|
||||
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest);
|
||||
if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) {
|
||||
targetTeam.insert(targetTeam.end(), dest.begin(), dest.end());
|
||||
}
|
||||
if (!targetTeam.empty()) {
|
||||
break;
|
||||
}
|
||||
if (std::find(src.begin(), src.end(), serverID) == src.end()) {
|
||||
targetTeam.insert(targetTeam.end(), src.begin(), src.end());
|
||||
}
|
||||
}
|
||||
// for (int i = 0; i < keyServers.size() && teamForDroppedRange.empty(); ++i) {
|
||||
// decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest);
|
||||
// if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) {
|
||||
// teamForDroppedRange.insert(teamForDroppedRange.end(), dest.begin(), dest.end());
|
||||
// }
|
||||
// if (!teamForDroppedRange.empty()) {
|
||||
// break;
|
||||
// }
|
||||
// if (std::find(src.begin(), src.end(), serverID) == src.end()) {
|
||||
// teamForDroppedRange.insert(teamForDroppedRange.end(), src.begin(), src.end());
|
||||
// }
|
||||
// }
|
||||
|
||||
state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key);
|
||||
state int i = 0;
|
||||
|
@ -1383,8 +1384,10 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
// Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all
|
||||
// dest servers.
|
||||
if (src.empty()) {
|
||||
ASSERT(!targetTeam.empty());
|
||||
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, targetTeam, {}));
|
||||
if (teamForDroppedRange.empty()) {
|
||||
throw internal_error_msg("No team for the dropped range.");
|
||||
}
|
||||
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {}));
|
||||
vector<Future<Void>> actors;
|
||||
for (const UID& id : dest) {
|
||||
actors.push_back(krmSetRangeCoalescing(&tr,
|
||||
|
@ -1394,7 +1397,7 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
serverKeysFalse));
|
||||
}
|
||||
// Update serverKeys to include keys.
|
||||
for (const UID& id : targetTeam) {
|
||||
for (const UID& id : teamForDroppedRange) {
|
||||
actors.push_back(krmSetRangeCoalescing(&tr,
|
||||
serverKeysPrefixFor(id),
|
||||
KeyRangeRef(it.key, keyServers[i + 1].key),
|
||||
|
@ -1404,12 +1407,8 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||
TraceEvent(SevWarn, "FailedServerRemoveRange", serverID)
|
||||
.detail("Key", it.key)
|
||||
.detail("OldDest", describe(dest))
|
||||
.detail("NewTeam", describe(targetTeam));
|
||||
.detail("NewTeam", describe(teamForDroppedRange));
|
||||
wait(waitForAll(actors));
|
||||
TraceEvent(SevWarn, "FailedServerRemoveRangeEnd", serverID)
|
||||
.detail("Key", it.key)
|
||||
.detail("OldDest", describe(dest))
|
||||
.detail("NewTeam", describe(targetTeam));
|
||||
} else {
|
||||
TraceEvent("FailedServerSetKey", serverID)
|
||||
.detail("Key", it.key)
|
||||
|
|
|
@ -101,6 +101,7 @@ ACTOR Future<bool> canRemoveStorageServer(Reference<ReadYourWritesTransaction> t
|
|||
// Obviously that could change later!
|
||||
ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
||||
UID serverID,
|
||||
std::vector<UID> teamForDroppedRange,
|
||||
MoveKeysLock lock,
|
||||
const DDEnabledState* ddEnabledState);
|
||||
// Directly removes serverID from serverKeys and keyServers system keyspace.
|
||||
|
|
Loading…
Reference in New Issue