fixed removeKeys impl, adjusted test workload, and introduced extra safety checks to NativeAPI and proxy
This commit is contained in:
parent
a07cf5d038
commit
00c2025d4b
|
@ -2053,7 +2053,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
if (!safe) {
|
||||
std::string errorStr =
|
||||
"ERROR: It is unsafe to exclude the specified servers at this time.\n"
|
||||
"Please try the exclude again in 30 seconds.\n"
|
||||
"Please check that this exclusion does not bring down an entire server team.\n"
|
||||
"Please also ensure that the exclusion will keep a majority of coordinators alive.\n"
|
||||
"Type `exclude FORCE permanent <ADDRESS>*' to exclude without performing safety checks.\n";
|
||||
printf("%s", errorStr.c_str());
|
||||
return true;
|
||||
|
|
|
@ -3433,7 +3433,40 @@ ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions) {
|
||||
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions) {
|
||||
ExclusionSafetyCheckRequest req(exclusions);
|
||||
return loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::exclusionSafetyCheckReq, req, cx->taskID);
|
||||
state bool ddCheck =
|
||||
wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::exclusionSafetyCheckReq, req, cx->taskID));
|
||||
state ClientCoordinators coordinatorList(cx->getConnectionFile());
|
||||
state vector<Future<Optional<LeaderInfo>>> leaderServers;
|
||||
for (int i = 0; i < coordinatorList.clientLeaderServers.size(); i++) {
|
||||
leaderServers.push_back(retryBrokenPromise(coordinatorList.clientLeaderServers[i].getLeader,
|
||||
GetLeaderRequest(coordinatorList.clusterKey, UID()),
|
||||
TaskPriority::CoordinationReply));
|
||||
}
|
||||
wait(smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) || delay(2.0));
|
||||
int attemptCoordinatorExclude = 0;
|
||||
int coordinatorsUnavailable = 0;
|
||||
for (int i = 0; i < leaderServers.size(); i++) {
|
||||
NetworkAddress leaderAddress =
|
||||
coordinatorList.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress();
|
||||
if (leaderServers[i].isReady()) {
|
||||
if ((std::count(exclusions.begin(), exclusions.end(),
|
||||
AddressExclusion(leaderAddress.ip, leaderAddress.port)) ||
|
||||
std::count(exclusions.begin(), exclusions.end(), AddressExclusion(leaderAddress.ip)))) {
|
||||
attemptCoordinatorExclude++;
|
||||
}
|
||||
} else {
|
||||
coordinatorsUnavailable++;
|
||||
}
|
||||
}
|
||||
int faultTolerance = (leaderServers.size() - 1) / 2 - coordinatorsUnavailable;
|
||||
TraceEvent("ExclusionSafetyCheck")
|
||||
.detail("CoordinatorListSize", leaderServers.size())
|
||||
.detail("NumExclusions", exclusions.size())
|
||||
.detail("FaultTolerance", faultTolerance)
|
||||
.detail("AttemptCoordinatorExclude", attemptCoordinatorExclude);
|
||||
|
||||
bool coordinatorCheck = (attemptCoordinatorExclude <= faultTolerance);
|
||||
return (ddCheck && coordinatorCheck);
|
||||
}
|
||||
|
|
|
@ -319,7 +319,7 @@ int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::num
|
|||
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID);
|
||||
|
||||
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
|
||||
Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
|
||||
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -3137,9 +3137,8 @@ ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version add
|
|||
//we cannot remove a server immediately after adding it, because a perfectly timed master recovery could cause us to not store the mutations sent to the short lived storage server.
|
||||
if(ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
|
||||
bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) );
|
||||
// Current implementation of server erasure is sort of a hack that sets # shards to 0
|
||||
// Defensive check for negative values instead of just 0
|
||||
if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) <= 0) {
|
||||
ASSERT(teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) >= 0);
|
||||
if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) == 0) {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
@ -4283,7 +4282,12 @@ ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq, Reference<AsyncV
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req, Reference<DDTeamCollection> self, Database cx) {
|
||||
ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req, Reference<DDTeamCollection> tc,
|
||||
Database cx) {
|
||||
if (!tc.isValid()) {
|
||||
req.reply.send(false);
|
||||
return Void();
|
||||
}
|
||||
state bool safe = true;
|
||||
vector<StorageServerInterface> ssis = wait(getStorageServers(cx));
|
||||
vector<UID> excludeServerIDs;
|
||||
|
@ -4297,7 +4301,7 @@ ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest
|
|||
}
|
||||
}
|
||||
std::sort(excludeServerIDs.begin(), excludeServerIDs.end());
|
||||
for (const auto &team : self->teams) {
|
||||
for (const auto &team : tc->teams) {
|
||||
vector<UID> teamServerIDs = team->getServerIDs();
|
||||
std::sort(teamServerIDs.begin(), teamServerIDs.end());
|
||||
TraceEvent("DDExclusionSafetyCheck")
|
||||
|
|
|
@ -708,8 +708,11 @@ std::pair<vector<ShardsAffectedByTeamFailure::Team>,vector<ShardsAffectedByTeamF
|
|||
|
||||
void ShardsAffectedByTeamFailure::erase(Team team, KeyRange const& range) {
|
||||
if(team_shards.erase( std::pair<Team,KeyRange>(team, range) ) > 0) {
|
||||
for(auto uid = team.servers.begin(); uid != team.servers.end(); ++uid)
|
||||
storageServerShards[*uid]--;
|
||||
for (auto uid = team.servers.begin(); uid != team.servers.end(); ++uid) {
|
||||
if (storageServerShards[*uid] > 0) {
|
||||
storageServerShards[*uid]--;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1498,7 +1498,7 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
|
||||
// send a snap request to DD
|
||||
if (!commitData->db->get().distributor.present()) {
|
||||
TraceEvent(SevWarnAlways, "DataDistributorNotPresent");
|
||||
TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest");
|
||||
throw operation_failed();
|
||||
}
|
||||
state Future<ErrorOr<Void>> ddSnapReq =
|
||||
|
@ -1531,7 +1531,31 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
}
|
||||
|
||||
ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo>> db, ExclusionSafetyCheckRequest req) {
|
||||
bool safe = wait(db->get().distributor.get().distributorExclCheckReq.getReply(DistributorExclusionSafetyCheckRequest(req.exclusions)));
|
||||
if (!db->get().distributor.present()) {
|
||||
TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "ExclusionSafetyCheck");
|
||||
req.reply.send(false);
|
||||
return Void();
|
||||
}
|
||||
state bool safe = false;
|
||||
loop {
|
||||
try {
|
||||
state Future<ErrorOr<bool>> safeFuture = db->get().distributor.get().distributorExclCheckReq.tryGetReply(
|
||||
DistributorExclusionSafetyCheckRequest(req.exclusions));
|
||||
bool _safe = wait(throwErrorOr(safeFuture));
|
||||
safe = _safe;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("SafetyCheckMasterProxy.DDSafetyCheckResponseError").error(e);
|
||||
if (e.code() == error_code_request_maybe_delivered) {
|
||||
// continue
|
||||
} else if (e.code() != error_code_operation_cancelled) {
|
||||
req.reply.sendError(e);
|
||||
return Void();
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
req.reply.send(safe);
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -939,10 +939,10 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx, UID serverID, MoveKey
|
|||
// Get all values of keyServers and remove serverID from every occurrence
|
||||
// Very inefficient going over every entry in keyServers
|
||||
// No shortcut because keyServers and serverKeys are not guaranteed same shard boundaries
|
||||
state KeyRange currentKeys = KeyRangeRef(begin, allKeys.end);
|
||||
state Standalone<RangeResultRef> keyServers =
|
||||
wait(krmGetRanges(&tr, keyServersPrefix, currentKeys,
|
||||
wait(krmGetRanges(&tr, keyServersPrefix, KeyRangeRef(begin, allKeys.end),
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
||||
state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key);
|
||||
for (auto it : keyServers) {
|
||||
vector<UID> src;
|
||||
vector<UID> dest;
|
||||
|
@ -965,7 +965,7 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx, UID serverID, MoveKey
|
|||
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), currentKeys, allKeys, serverKeysFalse));
|
||||
wait(tr.commit());
|
||||
// Update beginning of next iteration's range
|
||||
begin = keyServers.end()[-1].key;
|
||||
begin = currentKeys.end;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("FailedServerError").error(e);
|
||||
|
|
|
@ -211,10 +211,11 @@ ACTOR Future<std::pair<int64_t,int64_t>> getTLogQueueInfo( Database cx, Referenc
|
|||
|
||||
ACTOR Future<vector<StorageServerInterface>> getStorageServers( Database cx, bool use_system_priority = false) {
|
||||
state Transaction tr( cx );
|
||||
if (use_system_priority)
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
loop {
|
||||
if (use_system_priority) {
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
}
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
Standalone<RangeResultRef> serverList = wait( tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) );
|
||||
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
|
||||
|
|
|
@ -405,8 +405,16 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
|
||||
std::copy(toKill.begin(), toKill.end(), std::back_inserter(toKillArray));
|
||||
killProcArray = self->getProcesses(toKill);
|
||||
if (toKillArray.size()) {
|
||||
toKillMarkFailedArray.push_back(deterministicRandom()->randomChoice(toKillArray));
|
||||
|
||||
loop {
|
||||
auto failSet = random_subset(toKillArray, deterministicRandom()->randomInt(1, toKillArray.size() / 2 + 2));
|
||||
toKillMarkFailedArray.resize(failSet.size());
|
||||
std::copy(failSet.begin(), failSet.end(), toKillMarkFailedArray.begin());
|
||||
TraceEvent("RemoveAndKill", functionId)
|
||||
.detail("Step", "Safety Check")
|
||||
.detail("Exclusions", describe(toKillMarkFailedArray));
|
||||
bool safe = wait(checkSafeExclusions(cx, toKillMarkFailedArray));
|
||||
if (safe) break;
|
||||
}
|
||||
|
||||
TraceEvent("RemoveAndKill", functionId).detail("Step", "Activate Server Exclusion").detail("KillAddrs", toKill.size()).detail("KillProcs", killProcArray.size()).detail("MissingProcs", toKill.size()!=killProcArray.size()).detail("ToKill", describe(toKill)).detail("Addresses", describe(toKillArray)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
|
Loading…
Reference in New Issue