addressed code review comments
This commit is contained in:
parent
080fbc63dc
commit
a07cf5d038
|
@ -2146,7 +2146,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
|||
|
||||
wait( makeInterruptable(excludeServers(db,addresses,permanentlyFailed)) );
|
||||
|
||||
if (waitForAllExcluded && !permanentlyFailed) {
|
||||
if (waitForAllExcluded) {
|
||||
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
|
||||
printf("(Interrupting this wait with CTRL+C will not cancel the data movement.)\n");
|
||||
}
|
||||
|
|
|
@ -3433,8 +3433,7 @@ ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions) {
|
||||
Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions) {
|
||||
ExclusionSafetyCheckRequest req(exclusions);
|
||||
bool safe = wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::exclusionSafetyCheckReq, req, cx->taskID));
|
||||
return safe;
|
||||
return loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::exclusionSafetyCheckReq, req, cx->taskID);
|
||||
}
|
||||
|
|
|
@ -319,7 +319,7 @@ int64_t extractIntOption( Optional<StringRef> value, int64_t minValue = std::num
|
|||
ACTOR Future<Void> snapCreate(Database cx, StringRef snapCmd, UID snapUID);
|
||||
|
||||
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
|
||||
ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
|
||||
Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exclusions);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -4288,7 +4288,7 @@ ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest
|
|||
vector<StorageServerInterface> ssis = wait(getStorageServers(cx));
|
||||
vector<UID> excludeServerIDs;
|
||||
// Go through storage server interfaces and translate Address -> server ID (UID)
|
||||
for (auto ssi : ssis) {
|
||||
for (const auto &ssi : ssis) {
|
||||
for (AddressExclusion excl : req.exclusions) {
|
||||
if (excl.excludes(ssi.address())) {
|
||||
excludeServerIDs.push_back(ssi.id());
|
||||
|
@ -4297,7 +4297,7 @@ ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest
|
|||
}
|
||||
}
|
||||
std::sort(excludeServerIDs.begin(), excludeServerIDs.end());
|
||||
for (auto team : self->teams) {
|
||||
for (const auto &team : self->teams) {
|
||||
vector<UID> teamServerIDs = team->getServerIDs();
|
||||
std::sort(teamServerIDs.begin(), teamServerIDs.end());
|
||||
TraceEvent("DDExclusionSafetyCheck")
|
||||
|
|
|
@ -939,8 +939,9 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx, UID serverID, MoveKey
|
|||
// Get all values of keyServers and remove serverID from every occurrence
|
||||
// Very inefficient going over every entry in keyServers
|
||||
// No shortcut because keyServers and serverKeys are not guaranteed same shard boundaries
|
||||
state KeyRange currentKeys = KeyRangeRef(begin, allKeys.end);
|
||||
state Standalone<RangeResultRef> keyServers =
|
||||
wait(krmGetRanges(&tr, keyServersPrefix, KeyRangeRef(begin, allKeys.end),
|
||||
wait(krmGetRanges(&tr, keyServersPrefix, currentKeys,
|
||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
||||
for (auto it : keyServers) {
|
||||
vector<UID> src;
|
||||
|
@ -961,12 +962,13 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx, UID serverID, MoveKey
|
|||
}
|
||||
|
||||
// Set entire range for our serverID in serverKeys keyspace to false to signal erasure
|
||||
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), allKeys, allKeys, serverKeysFalse));
|
||||
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), currentKeys, allKeys, serverKeysFalse));
|
||||
wait(tr.commit());
|
||||
// Update beginning of next iteration's range
|
||||
begin = keyServers.end()[-1].key;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("FailedServerError").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue