Audit location metadata in DD (#10820)

* Audit location metadata in DD

* nits
This commit is contained in:
Zhe Wang 2023-08-25 17:11:11 -07:00 committed by GitHub
parent 3bdcbef465
commit f43b20e15c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 604 additions and 606 deletions

View File

@ -871,7 +871,8 @@ ACTOR Future<std::vector<AuditStorageState>> initAuditMetadata(Database cx,
throw e;
}
if (retryCount > 50) {
TraceEvent(SevWarnAlways, "InitAuditMetadataExceedRetryMax", dataDistributorId).errorUnsuppressed(e);
TraceEvent(SevWarnAlways, "AuditUtilInitAuditMetadataExceedRetryMax", dataDistributorId)
.errorUnsuppressed(e);
break;
}
try {
@ -884,3 +885,257 @@ ACTOR Future<std::vector<AuditStorageState>> initAuditMetadata(Database cx,
}
return auditStatesToResume;
}
// Check if any pair of ranges are exclusive with each other
// This is not a part in consistency check of audit metadata
// This is used for checking the validity of inputs to rangesSame()
bool elementsAreExclusiveWithEachOther(std::vector<KeyRange> ranges) {
ASSERT(std::is_sorted(ranges.begin(), ranges.end(), KeyRangeRef::ArbitraryOrder()));
for (int i = 0; i < ranges.size() - 1; ++i) {
if (ranges[i].end > ranges[i + 1].begin) {
TraceEvent(SevError, "AuditUtilElementsAreNotExclusiveWithEachOther").detail("Ranges", describe(ranges));
return false;
}
}
return true;
}
// Check if any range is empty in the given list of ranges
// This is not a part in consistency check of audit metadata
// This is used for checking the validity of inputs to rangesSame()
bool noEmptyRangeInRanges(std::vector<KeyRange> ranges) {
for (const auto& range : ranges) {
if (range.empty()) {
return false;
}
}
return true;
}
// Given a list of ranges, where ranges can overlap with each other
// Return a list of exclusive ranges which covers the ranges exactly
// the same as the input list of ranges
std::vector<KeyRange> coalesceRangeList(std::vector<KeyRange> ranges) {
std::sort(ranges.begin(), ranges.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; });
std::vector<KeyRange> res;
for (const auto& range : ranges) {
if (res.empty()) {
res.push_back(range);
continue;
}
if (range.begin <= res.back().end) {
if (range.end > res.back().end) { // update res.back if current range extends the back range
KeyRange newBack = Standalone(KeyRangeRef(res.back().begin, range.end));
res.pop_back();
res.push_back(newBack);
}
} else {
res.push_back(range);
}
}
return res;
}
// Given two lists of ranges --- rangesA and rangesB, check if two lists are identical
// If not, return the mismatched two ranges of rangeA and rangeB respectively
Optional<std::pair<KeyRange, KeyRange>> rangesSame(std::vector<KeyRange> rangesA, std::vector<KeyRange> rangesB) {
if (g_network->isSimulated()) {
ASSERT(noEmptyRangeInRanges(rangesA));
ASSERT(noEmptyRangeInRanges(rangesB));
}
KeyRange emptyRange;
if (rangesA.empty() && rangesB.empty()) { // no mismatch
return Optional<std::pair<KeyRange, KeyRange>>();
} else if (rangesA.empty() && !rangesB.empty()) { // rangesA is empty while rangesB has a range
return std::make_pair(emptyRange, rangesB.front());
} else if (!rangesA.empty() && rangesB.empty()) { // rangesB is empty while rangesA has a range
return std::make_pair(rangesA.front(), emptyRange);
}
TraceEvent(SevVerbose, "AuditUtilRangesSameBeforeSort").detail("RangesA", rangesA).detail("Rangesb", rangesB);
// sort in ascending order
std::sort(rangesA.begin(), rangesA.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; });
std::sort(rangesB.begin(), rangesB.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; });
TraceEvent(SevVerbose, "AuditUtilRangesSameAfterSort").detail("RangesA", rangesA).detail("Rangesb", rangesB);
if (g_network->isSimulated()) {
ASSERT(elementsAreExclusiveWithEachOther(rangesA));
ASSERT(elementsAreExclusiveWithEachOther(rangesB));
}
if (rangesA.front().begin != rangesB.front().begin) { // rangeList heads mismatch
return std::make_pair(rangesA.front(), rangesB.front());
} else if (rangesA.back().end != rangesB.back().end) { // rangeList backs mismatch
return std::make_pair(rangesA.back(), rangesB.back());
}
int ia = 0;
int ib = 0;
KeyRangeRef rangeA = rangesA[0];
KeyRangeRef rangeB = rangesB[0];
KeyRange lastRangeA = Standalone(rangeA);
KeyRange lastRangeB = Standalone(rangeB);
while (true) {
if (rangeA.begin == rangeB.begin) {
if (rangeA.end == rangeB.end) {
if (rangeA.end == rangesA.back().end) {
break;
}
++ia;
++ib;
rangeA = rangesA[ia];
rangeB = rangesB[ib];
lastRangeA = Standalone(rangeA);
lastRangeB = Standalone(rangeB);
} else if (rangeA.end > rangeB.end) {
rangeA = KeyRangeRef(rangeB.end, rangeA.end);
++ib;
rangeB = rangesB[ib];
lastRangeB = Standalone(rangeB);
} else {
rangeB = KeyRangeRef(rangeA.end, rangeB.end);
++ia;
rangeA = rangesA[ia];
lastRangeA = Standalone(rangeA);
}
} else {
return std::make_pair(lastRangeA, lastRangeB);
}
}
return Optional<std::pair<KeyRange, KeyRange>>();
}
// Given an input server, get ranges within the input range via the input transaction
// from the perspective of ServerKeys system key space
// Input: (1) SS id; (2) transaction tr; (3) within range
// Return AuditGetServerKeysRes, including: (1) complete range by a single read range;
// (2) verison of the read; (3) ranges of the input SS
ACTOR Future<AuditGetServerKeysRes> getThisServerKeysFromServerKeys(UID serverID, Transaction* tr, KeyRange range) {
state RangeResult readResult;
state AuditGetServerKeysRes res;
try {
wait(store(readResult,
krmGetRanges(tr,
serverKeysPrefixFor(serverID),
range,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
Future<Version> grvF = tr->getReadVersion();
if (!grvF.isReady()) {
TraceEvent(SevWarnAlways, "AuditUtilReadServerKeysGRVError", serverID);
throw audit_storage_cancelled();
}
Version readAtVersion = grvF.get();
TraceEvent(SevVerbose, "AuditUtilGetThisServerKeysFromServerKeysReadDone", serverID)
.detail("Range", range)
.detail("Prefix", serverKeysPrefixFor(serverID))
.detail("ResultSize", readResult.size())
.detail("AduitServerID", serverID);
std::vector<KeyRange> ownRanges;
for (int i = 0; i < readResult.size() - 1; ++i) {
TraceEvent(SevVerbose, "AuditUtilGetThisServerKeysFromServerKeysAddToResult", serverID)
.detail("ValueIsServerKeysFalse", readResult[i].value == serverKeysFalse)
.detail("ServerHasKey", serverHasKey(readResult[i].value))
.detail("Range", KeyRangeRef(readResult[i].key, readResult[i + 1].key))
.detail("AduitServerID", serverID);
if (serverHasKey(readResult[i].value)) {
KeyRange shardRange;
ownRanges.push_back(Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key)));
}
}
const KeyRange completeRange = Standalone(KeyRangeRef(range.begin, readResult.back().key));
TraceEvent(SevVerbose, "AuditUtilGetThisServerKeysFromServerKeysEnd", serverID)
.detail("AduitServerID", serverID)
.detail("Range", range)
.detail("Prefix", serverKeysPrefixFor(serverID))
.detail("ReadAtVersion", readAtVersion)
.detail("CompleteRange", completeRange)
.detail("ResultSize", ownRanges.size());
res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges, readResult.logicalSize());
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilGetThisServerKeysError", serverID)
.errorUnsuppressed(e)
.detail("AduitServerID", serverID);
throw e;
}
return res;
}
// Given an input server, get ranges within the input range via the input transaction
// from the perspective of KeyServers system key space
// Input: (1) Audit Server ID (for logging); (2) transaction tr; (3) within range
// Return AuditGetKeyServersRes, including : (1) complete range by a single read range; (2) verison of the read;
// (3) map between SSes and their ranges --- in KeyServers space, a range corresponds to multiple SSes
ACTOR Future<AuditGetKeyServersRes> getShardMapFromKeyServers(UID auditServerId, Transaction* tr, KeyRange range) {
state AuditGetKeyServersRes res;
state std::vector<Future<Void>> actors;
state RangeResult readResult;
state RangeResult UIDtoTagMap;
state int64_t totalShardsCount = 0;
state int64_t shardsInAnonymousPhysicalShardCount = 0;
try {
// read
actors.push_back(store(readResult,
krmGetRanges(tr,
keyServersPrefix,
range,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
actors.push_back(store(UIDtoTagMap, tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)));
wait(waitForAll(actors));
if (UIDtoTagMap.more || UIDtoTagMap.size() >= CLIENT_KNOBS->TOO_MANY) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"AuditUtilReadKeyServersReadTagError",
auditServerId);
throw audit_storage_cancelled();
}
Future<Version> grvF = tr->getReadVersion();
if (!grvF.isReady()) {
TraceEvent(SevWarnAlways, "AuditUtilReadKeyServersGRVError", auditServerId);
throw audit_storage_cancelled();
}
Version readAtVersion = grvF.get();
TraceEvent(SevVerbose, "AuditUtilGetThisServerKeysFromKeyServersReadDone", auditServerId)
.detail("Range", range)
.detail("ResultSize", readResult.size())
.detail("AduitServerID", auditServerId);
// produce result
std::unordered_map<UID, std::vector<KeyRange>> serverOwnRanges;
for (int i = 0; i < readResult.size() - 1; ++i) {
std::vector<UID> src;
std::vector<UID> dest;
UID srcID;
UID destID;
decodeKeyServersValue(UIDtoTagMap, readResult[i].value, src, dest, srcID, destID);
if (srcID == anonymousShardId) {
shardsInAnonymousPhysicalShardCount++;
}
totalShardsCount++;
std::vector<UID> servers(src.size() + dest.size());
std::merge(src.begin(), src.end(), dest.begin(), dest.end(), servers.begin());
for (auto& ssid : servers) {
serverOwnRanges[ssid].push_back(Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key)));
}
}
const KeyRange completeRange = Standalone(KeyRangeRef(range.begin, readResult.back().key));
TraceEvent(SevInfo, "AuditUtilGetThisServerKeysFromKeyServersEnd", auditServerId)
.detail("Range", range)
.detail("CompleteRange", completeRange)
.detail("AtVersion", readAtVersion)
.detail("ShardsInAnonymousPhysicalShardCount", shardsInAnonymousPhysicalShardCount)
.detail("TotalShardsCount", totalShardsCount);
res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges, readResult.logicalSize());
} catch (Error& e) {
TraceEvent(SevDebug, "AuditUtilGetThisServerKeysFromKeyServersError", auditServerId)
.errorUnsuppressed(e)
.detail("AuditServerId", auditServerId);
throw e;
}
return res;
}

View File

@ -82,5 +82,41 @@ ACTOR Future<std::vector<AuditStorageState>> initAuditMetadata(Database cx,
bool ddEnabled,
UID dataDistributorId,
int persistFinishAuditCount);
struct AuditGetServerKeysRes {
KeyRange completeRange;
Version readAtVersion;
UID serverId;
std::vector<KeyRange> ownRanges;
int64_t readBytes;
AuditGetServerKeysRes() = default;
AuditGetServerKeysRes(KeyRange completeRange,
Version readAtVersion,
UID serverId,
std::vector<KeyRange> ownRanges,
int64_t readBytes)
: completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges),
readBytes(readBytes) {}
};
struct AuditGetKeyServersRes {
KeyRange completeRange;
Version readAtVersion;
int64_t readBytes;
std::unordered_map<UID, std::vector<KeyRange>> rangeOwnershipMap;
AuditGetKeyServersRes() = default;
AuditGetKeyServersRes(KeyRange completeRange,
Version readAtVersion,
std::unordered_map<UID, std::vector<KeyRange>> rangeOwnershipMap,
int64_t readBytes)
: completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap),
readBytes(readBytes) {}
};
std::vector<KeyRange> coalesceRangeList(std::vector<KeyRange> ranges);
Optional<std::pair<KeyRange, KeyRange>> rangesSame(std::vector<KeyRange> rangesA, std::vector<KeyRange> rangesB);
ACTOR Future<AuditGetServerKeysRes> getThisServerKeysFromServerKeys(UID serverID, Transaction* tr, KeyRange range);
ACTOR Future<AuditGetKeyServersRes> getShardMapFromKeyServers(UID auditServerId, Transaction* tr, KeyRange range);
#include "flow/unactorcompiler.h"
#endif

View File

@ -330,17 +330,23 @@ ACTOR Future<Void> auditStorageCore(Reference<DataDistributor> self,
UID auditID,
AuditType auditType,
int currentRetryCount);
ACTOR Future<UID> launchAudit(Reference<DataDistributor> self, KeyRange auditRange, AuditType auditType);
ACTOR Future<UID> launchAudit(Reference<DataDistributor> self,
KeyRange auditRange,
AuditType auditType,
KeyValueStoreType auditStorageEngineType);
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req);
void loadAndDispatchAudit(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
ACTOR Future<Void> dispatchAuditStorageServerShard(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
ACTOR Future<Void> scheduleAuditStorageShardOnServer(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
StorageServerInterface ssi);
ACTOR Future<Void> scheduleAuditLocationMetadata(Reference<DataDistributor> self,
ACTOR Future<Void> dispatchAuditStorage(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
ACTOR Future<Void> dispatchAuditLocationMetadata(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
ACTOR Future<Void> dispatchAuditStorage(Reference<DataDistributor> self, std::shared_ptr<DDAudit> audit);
ACTOR Future<Void> doAuditLocationMetadata(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange auditRange);
ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range);
@ -2327,7 +2333,7 @@ void loadAndDispatchAudit(Reference<DataDistributor> self, std::shared_ptr<DDAud
} else if (audit->coreState.getType() == AuditType::ValidateReplica) {
audit->actors.add(dispatchAuditStorage(self, audit));
} else if (audit->coreState.getType() == AuditType::ValidateLocationMetadata) {
audit->actors.add(scheduleAuditLocationMetadata(self, audit, allKeys));
audit->actors.add(dispatchAuditLocationMetadata(self, audit, allKeys));
} else if (audit->coreState.getType() == AuditType::ValidateStorageServerShard) {
audit->actors.add(dispatchAuditStorageServerShard(self, audit));
} else {
@ -2338,20 +2344,18 @@ void loadAndDispatchAudit(Reference<DataDistributor> self, std::shared_ptr<DDAud
// This function is for locationmetadata audits
// Schedule audit task on input range
ACTOR Future<Void> scheduleAuditLocationMetadata(Reference<DataDistributor> self,
ACTOR Future<Void> dispatchAuditLocationMetadata(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange range) {
state const AuditType auditType = audit->coreState.getType();
ASSERT(auditType == AuditType::ValidateLocationMetadata);
TraceEvent(SevInfo, "DDScheduleAuditLocationMetadataBegin", self->ddId)
TraceEvent(SevInfo, "DDdispatchAuditLocationMetadataBegin", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", auditType);
state Key begin = range.begin;
state KeyRange currentRange = range;
state std::vector<AuditStorageState> auditStates;
state int64_t issueDoAuditCount = 0;
state AuditStorageRequest req;
state std::vector<StorageServerInterface> targetCandidates;
try {
while (begin < range.end) {
@ -2361,7 +2365,7 @@ ACTOR Future<Void> scheduleAuditLocationMetadata(Reference<DataDistributor> self
getAuditStateByRange(self->txnProcessor->context(), auditType, audit->coreState.id, currentRange)));
ASSERT(!auditStates.empty());
begin = auditStates.back().range.end;
TraceEvent(SevInfo, "DDScheduleAuditLocationMetadataDispatch", self->ddId)
TraceEvent(SevInfo, "DDdispatchAuditLocationMetadataDispatch", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("CurrentRange", currentRange)
.detail("AuditType", auditType)
@ -2386,23 +2390,17 @@ ACTOR Future<Void> scheduleAuditLocationMetadata(Reference<DataDistributor> self
audit->remainingBudgetForAuditTasks.set(audit->remainingBudgetForAuditTasks.get() - 1);
ASSERT(audit->remainingBudgetForAuditTasks.get() >= 0);
TraceEvent(SevDebug, "RemainingBudgetForAuditTasks")
.detail("Loc", "scheduleAuditLocationMetadata")
.detail("Loc", "dispatchAuditLocationMetadata")
.detail("Ops", "Decrease")
.detail("Val", audit->remainingBudgetForAuditTasks.get())
.detail("AuditType", auditType);
req = AuditStorageRequest(audit->coreState.id, auditStates[i].range, auditType);
issueDoAuditCount++;
req.ddId = self->ddId; // send this ddid to SS
targetCandidates.clear();
wait(store(targetCandidates, getStorageServers(self->txnProcessor->context())));
StorageServerInterface targetServer = deterministicRandom()->randomChoice(targetCandidates);
audit->actors.add(doAuditOnStorageServer(self, audit, targetServer, req));
audit->actors.add(doAuditLocationMetadata(self, audit, auditStates[i].range));
}
}
wait(delay(0.1));
}
TraceEvent(SevInfo, "DDScheduleAuditLocationMetadataEnd", self->ddId)
TraceEvent(SevInfo, "DDdispatchAuditLocationMetadataEnd", self->ddId)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", auditType)
.detail("IssuedDoAuditCount", issueDoAuditCount);
@ -2411,7 +2409,7 @@ ACTOR Future<Void> scheduleAuditLocationMetadata(Reference<DataDistributor> self
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevWarn, "DDScheduleAuditLocationMetadataError", self->ddId)
TraceEvent(SevWarn, "DDdispatchAuditLocationMetadataError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->coreState.id)
.detail("AuditType", auditType);
@ -2767,7 +2765,7 @@ ACTOR Future<Void> scheduleAuditOnRange(Reference<DataDistributor> self,
if (auditType == AuditType::ValidateHA) {
if (rangeLocations[rangeLocationIndex].servers.size() < 2) {
TraceEvent(SevInfo, "DDScheduleAuditOnRangeEnd", self->ddId)
.detail("Reason", "Single replica, ignore")
.detail("Reason", "Single DC, ignore")
.detail("AuditID", audit->coreState.id)
.detail("AuditRange", audit->coreState.range)
.detail("AuditType", auditType);
@ -2977,6 +2975,8 @@ ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
StorageServerInterface ssi,
AuditStorageRequest req) {
state AuditType auditType = req.getType();
ASSERT(auditType == AuditType::ValidateHA || auditType == AuditType::ValidateReplica ||
auditType == AuditType::ValidateStorageServerShard);
TraceEvent(SevInfo, "DDDoAuditOnStorageServerBegin", self->ddId)
.detail("AuditID", req.id)
.detail("Range", req.range)
@ -3049,16 +3049,286 @@ ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
} else {
ASSERT(req.getType() != AuditType::ValidateStorageServerShard);
audit->retryCount++;
if (req.getType() == AuditType::ValidateLocationMetadata) {
audit->actors.add(scheduleAuditLocationMetadata(self, audit, req.range));
} else {
audit->actors.add(scheduleAuditOnRange(self, audit, req.range));
}
audit->actors.add(scheduleAuditOnRange(self, audit, req.range));
}
}
return Void();
}
// Check consistency between KeyServers and ServerKeys system key space
ACTOR Future<Void> doAuditLocationMetadata(Reference<DataDistributor> self,
std::shared_ptr<DDAudit> audit,
KeyRange auditRange) {
ASSERT(audit->coreState.getType() == AuditType::ValidateLocationMetadata);
TraceEvent(SevInfo, "DDDoAuditLocationMetadataBegin", self->ddId)
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange);
state AuditStorageState res(audit->coreState.id, audit->coreState.getType()); // we will set range of audit later
state std::vector<Future<Void>> actors;
state std::vector<std::string> errors;
state AuditGetKeyServersRes keyServerRes;
state std::unordered_map<UID, AuditGetServerKeysRes> serverKeyResMap;
state Version readAtVersion;
state std::unordered_map<UID, std::vector<KeyRange>> mapFromKeyServersRaw;
// Note that since krmReadRange may not return the value of the entire range at a time
// Given auditRange, a part of the range is returned, thus, only a part of the range is
// able to be compared --- claimRange
// Given claimRange, rangeToRead is decided for reading the remaining range
// At beginning, rangeToRead is auditRange
state KeyRange claimRange;
state KeyRange completeRangeByKeyServer;
// To compare
state std::unordered_map<UID, std::vector<KeyRange>> mapFromServerKeys;
state std::unordered_map<UID, std::vector<KeyRange>> mapFromKeyServers;
state Transaction tr(self->txnProcessor->context());
state Key rangeToReadBegin = auditRange.begin;
state KeyRangeRef rangeToRead;
state int64_t cumulatedValidatedServerKeysNum = 0;
state int64_t cumulatedValidatedKeyServersNum = 0;
state Reference<IRateControl> rateLimiter =
Reference<IRateControl>(new SpeedLimit(SERVER_KNOBS->AUDIT_STORAGE_RATE_PER_SERVER_MAX, 1));
state int64_t remoteReadBytes = 0;
try {
loop {
try {
// Read
actors.clear();
errors.clear();
mapFromServerKeys.clear();
mapFromKeyServers.clear();
serverKeyResMap.clear();
mapFromKeyServersRaw.clear();
remoteReadBytes = 0;
rangeToRead = KeyRangeRef(rangeToReadBegin, auditRange.end);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
// Read KeyServers
wait(store(keyServerRes, getShardMapFromKeyServers(self->ddId, &tr, rangeToRead)));
completeRangeByKeyServer = keyServerRes.completeRange;
readAtVersion = keyServerRes.readAtVersion;
mapFromKeyServersRaw = keyServerRes.rangeOwnershipMap;
remoteReadBytes += keyServerRes.readBytes;
// Use ssid of mapFromKeyServersRaw to read ServerKeys
for (auto& [ssid, _] : mapFromKeyServersRaw) {
actors.push_back(
store(serverKeyResMap[ssid], getThisServerKeysFromServerKeys(ssid, &tr, rangeToRead)));
}
wait(waitForAll(actors));
// Decide claimRange and check readAtVersion
claimRange = completeRangeByKeyServer;
for (auto& [ssid, serverKeyRes] : serverKeyResMap) {
KeyRange serverKeyCompleteRange = serverKeyRes.completeRange;
TraceEvent(SevVerbose, "DDDoAuditLocationMetadataGetClaimRange", self->ddId)
.detail("ServerId", ssid)
.detail("ServerKeyCompleteRange", serverKeyCompleteRange)
.detail("CurrentClaimRange", claimRange);
KeyRange overlappingRange = serverKeyCompleteRange & claimRange;
if (serverKeyCompleteRange.begin != claimRange.begin || overlappingRange.empty() ||
readAtVersion != serverKeyRes.readAtVersion) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"DDDoAuditLocationMetadataReadCheckWrong",
self->ddId)
.detail("ServerKeyCompleteRangeBegin", serverKeyCompleteRange.begin)
.detail("ClaimRangeBegin", claimRange.begin)
.detail("OverlappingRange", overlappingRange)
.detail("ReadAtVersion", readAtVersion)
.detail("ServerKeyResReadAtVersion", serverKeyRes.readAtVersion);
throw audit_storage_cancelled();
}
claimRange = overlappingRange;
remoteReadBytes += serverKeyRes.readBytes;
}
// Use claimRange to get mapFromServerKeys and mapFromKeyServers to compare
int64_t numValidatedServerKeys = 0;
for (auto& [ssid, serverKeyRes] : serverKeyResMap) {
for (auto& range : serverKeyRes.ownRanges) {
KeyRange overlappingRange = range & claimRange;
if (overlappingRange.empty()) {
continue;
}
TraceEvent(SevVerbose, "DDDoAuditLocationMetadataAddToServerKeyMap", self->ddId)
.detail("RawRange", range)
.detail("ClaimRange", claimRange)
.detail("Range", overlappingRange)
.detail("SSID", ssid);
mapFromServerKeys[ssid].push_back(overlappingRange);
numValidatedServerKeys++;
}
}
cumulatedValidatedServerKeysNum = cumulatedValidatedServerKeysNum + numValidatedServerKeys;
int64_t numValidatedKeyServers = 0;
for (auto& [ssid, ranges] : mapFromKeyServersRaw) {
std::vector mergedRanges = coalesceRangeList(ranges);
for (auto& range : mergedRanges) {
KeyRange overlappingRange = range & claimRange;
if (overlappingRange.empty()) {
continue;
}
TraceEvent(SevVerbose, "DDDoAuditLocationMetadataAddToKeyServerMap", self->ddId)
.detail("RawRange", range)
.detail("ClaimRange", claimRange)
.detail("Range", overlappingRange)
.detail("SSID", ssid);
mapFromKeyServers[ssid].push_back(overlappingRange);
numValidatedKeyServers++;
}
}
cumulatedValidatedKeyServersNum = cumulatedValidatedKeyServersNum + numValidatedKeyServers;
// Compare: check if mapFromKeyServers === mapFromServerKeys
// 1. check mapFromKeyServers => mapFromServerKeys
for (auto& [ssid, keyServerRanges] : mapFromKeyServers) {
if (!mapFromServerKeys.contains(ssid)) {
std::string error =
format("KeyServers and serverKeys mismatch: Some key in range(%s, %s) exists "
"on Server(%s) in KeyServers but not ServerKeys",
claimRange.toString().c_str(),
claimRange.toString().c_str(),
ssid.toString().c_str());
errors.push_back(error);
TraceEvent(SevError, "DDDoAuditLocationMetadataError", self->ddId)
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange)
.detail("ClaimRange", claimRange)
.detail("ErrorMessage", error);
}
std::vector<KeyRange> serverKeyRanges = mapFromServerKeys[ssid];
Optional<std::pair<KeyRange, KeyRange>> anyMismatch = rangesSame(keyServerRanges, serverKeyRanges);
if (anyMismatch.present()) { // mismatch detected
KeyRange mismatchedRangeByKeyServer = anyMismatch.get().first;
KeyRange mismatchedRangeByServerKey = anyMismatch.get().second;
std::string error =
format("KeyServers and serverKeys mismatch on Server(%s): KeyServer: %s; ServerKey: %s",
ssid.toString().c_str(),
mismatchedRangeByKeyServer.toString().c_str(),
mismatchedRangeByServerKey.toString().c_str());
errors.push_back(error);
TraceEvent(SevError, "DDDoAuditLocationMetadataError", self->ddId)
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange)
.detail("ClaimRange", claimRange)
.detail("ErrorMessage", error)
.detail("MismatchedRangeByKeyServer", mismatchedRangeByKeyServer)
.detail("MismatchedRangeByServerKey", mismatchedRangeByServerKey);
}
}
// 2. check mapFromServerKeys => mapFromKeyServers
for (auto& [ssid, serverKeyRanges] : mapFromServerKeys) {
if (!mapFromKeyServers.contains(ssid)) {
std::string error =
format("KeyServers and serverKeys mismatch: Some key of range(%s, %s) exists "
"on Server(%s) in ServerKeys but not KeyServers",
claimRange.toString().c_str(),
claimRange.toString().c_str(),
ssid.toString().c_str());
errors.push_back(error);
TraceEvent(SevError, "DDDoAuditLocationMetadataError", self->ddId)
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange)
.detail("ClaimRange", claimRange)
.detail("ErrorMessage", error);
}
}
// Log statistic
TraceEvent(SevInfo, "DDDoAuditLocationMetadataMetadata", self->ddId)
.suppressFor(30.0)
.detail("AuditType", audit->coreState.getType())
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange)
.detail("CurrentValidatedServerKeysNum", numValidatedServerKeys)
.detail("CurrentValidatedKeyServersNum", numValidatedServerKeys)
.detail("CurrentValidatedInclusiveRange", claimRange)
.detail("CumulatedValidatedServerKeysNum", cumulatedValidatedServerKeysNum)
.detail("CumulatedValidatedKeyServersNum", cumulatedValidatedKeyServersNum)
.detail("CumulatedValidatedInclusiveRange", KeyRangeRef(auditRange.begin, claimRange.end));
// Return result
if (!errors.empty()) {
TraceEvent(SevError, "DDDoAuditLocationMetadataError", self->ddId)
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange)
.detail("NumErrors", errors.size())
.detail("Version", readAtVersion)
.detail("ClaimRange", claimRange);
res.range = claimRange;
res.setPhase(AuditPhase::Error);
res.ddId = self->ddId; // used to compare self->ddId with existing persisted ddId
wait(persistAuditStateByRange(self->txnProcessor->context(), res));
throw audit_storage_error();
} else {
// Expand persisted complete range
res.range = Standalone(KeyRangeRef(auditRange.begin, claimRange.end));
res.setPhase(AuditPhase::Complete);
res.ddId = self->ddId; // used to compare self->ddId with existing persisted ddId
wait(persistAuditStateByRange(self->txnProcessor->context(), res));
if (res.range.end < auditRange.end) {
TraceEvent(SevInfo, "DDDoAuditLocationMetadataPartialDone", self->ddId)
.suppressFor(10.0)
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange)
.detail("Version", readAtVersion)
.detail("CompleteRange", res.range);
rangeToReadBegin = res.range.end;
} else { // complete
TraceEvent(SevInfo, "DDDoAuditLocationMetadataComplete", self->ddId)
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange)
.detail("CompleteRange", res.range)
.detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum)
.detail("NumValidatedKeyServers", cumulatedValidatedKeyServersNum);
break;
}
}
} catch (Error& e) {
wait(tr.onError(e));
}
wait(rateLimiter->getAllowance(remoteReadBytes)); // Rate Keeping
}
audit->remainingBudgetForAuditTasks.set(audit->remainingBudgetForAuditTasks.get() + 1);
ASSERT(audit->remainingBudgetForAuditTasks.get() <= SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX);
TraceEvent(SevDebug, "RemainingBudgetForAuditTasks")
.detail("Loc", "doAuditLocationMetadata")
.detail("Ops", "Increase")
.detail("Val", audit->remainingBudgetForAuditTasks.get())
.detail("AuditType", audit->coreState.getType());
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw e;
}
TraceEvent(SevInfo, "DDDoAuditLocationMetadataFailed", self->ddId)
.errorUnsuppressed(e)
.detail("AuditId", audit->coreState.id)
.detail("AuditRange", auditRange);
audit->remainingBudgetForAuditTasks.set(audit->remainingBudgetForAuditTasks.get() + 1);
ASSERT(audit->remainingBudgetForAuditTasks.get() <= SERVER_KNOBS->CONCURRENT_AUDIT_TASK_COUNT_MAX);
TraceEvent(SevDebug, "RemainingBudgetForAuditTasks")
.detail("Loc", "doAuditLocationMetadataFailed")
.detail("Ops", "Increase")
.detail("Val", audit->remainingBudgetForAuditTasks.get())
.detail("AuditType", audit->coreState.getType());
if (e.code() == error_code_audit_storage_error) {
throw audit_storage_error();
} else if (e.code() == error_code_audit_storage_cancelled) {
throw audit_storage_cancelled();
} else if (audit->retryCount >= SERVER_KNOBS->AUDIT_RETRY_COUNT_MAX) {
throw audit_storage_failed();
} else {
audit->retryCount++;
audit->actors.add(dispatchAuditLocationMetadata(self, audit, auditRange));
}
}
return Void();
}
ACTOR Future<Void> dataDistributor_impl(DataDistributorInterface di,
Reference<DataDistributor> self,
IsMocked isMocked) {

View File

@ -5000,121 +5000,6 @@ Key constructMappedKey(KeyValueRef* keyValue, std::vector<Optional<Tuple>>& vec,
return mappedKeyTuple.pack();
}
// Check if any pair of ranges are exclusive with each other
// This is not a part in consistency check of audit metadata
// This is used for checking the validity of inputs to rangesSame()
bool elementsAreExclusiveWithEachOther(std::vector<KeyRange> ranges) {
ASSERT(std::is_sorted(ranges.begin(), ranges.end(), KeyRangeRef::ArbitraryOrder()));
for (int i = 0; i < ranges.size() - 1; ++i) {
if (ranges[i].end > ranges[i + 1].begin) {
TraceEvent(SevError, "ElementsAreNotExclusiveWithEachOther").detail("Ranges", describe(ranges));
return false;
}
}
return true;
}
// Check if any range is empty in the given list of ranges
// This is not a part in consistency check of audit metadata
// This is used for checking the validity of inputs to rangesSame()
bool noEmptyRangeInRanges(std::vector<KeyRange> ranges) {
for (const auto& range : ranges) {
if (range.empty()) {
return false;
}
}
return true;
}
// Given a list of ranges, where ranges can overlap with each other
// Return a list of exclusive ranges which covers the ranges exactly
// the same as the input list of ranges
std::vector<KeyRange> coalesceRangeList(std::vector<KeyRange> ranges) {
std::sort(ranges.begin(), ranges.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; });
std::vector<KeyRange> res;
for (const auto& range : ranges) {
if (res.empty()) {
res.push_back(range);
continue;
}
if (range.begin <= res.back().end) {
if (range.end > res.back().end) { // update res.back if current range extends the back range
KeyRange newBack = Standalone(KeyRangeRef(res.back().begin, range.end));
res.pop_back();
res.push_back(newBack);
}
} else {
res.push_back(range);
}
}
return res;
}
// Given two lists of ranges --- rangesA and rangesB, check if two lists are identical
// If not, return the mismatched two ranges of rangeA and rangeB respectively
Optional<std::pair<KeyRange, KeyRange>> rangesSame(std::vector<KeyRange> rangesA, std::vector<KeyRange> rangesB) {
if (g_network->isSimulated()) {
ASSERT(noEmptyRangeInRanges(rangesA));
ASSERT(noEmptyRangeInRanges(rangesB));
}
KeyRange emptyRange;
if (rangesA.empty() && rangesB.empty()) { // no mismatch
return Optional<std::pair<KeyRange, KeyRange>>();
} else if (rangesA.empty() && !rangesB.empty()) { // rangesA is empty while rangesB has a range
return std::make_pair(emptyRange, rangesB.front());
} else if (!rangesA.empty() && rangesB.empty()) { // rangesB is empty while rangesA has a range
return std::make_pair(rangesA.front(), emptyRange);
}
TraceEvent(SevVerbose, "RangesSameBeforeSort").detail("RangesA", rangesA).detail("Rangesb", rangesB);
// sort in ascending order
std::sort(rangesA.begin(), rangesA.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; });
std::sort(rangesB.begin(), rangesB.end(), [](KeyRange a, KeyRange b) { return a.begin < b.begin; });
TraceEvent(SevVerbose, "RangesSameAfterSort").detail("RangesA", rangesA).detail("Rangesb", rangesB);
if (g_network->isSimulated()) {
ASSERT(elementsAreExclusiveWithEachOther(rangesA));
ASSERT(elementsAreExclusiveWithEachOther(rangesB));
}
if (rangesA.front().begin != rangesB.front().begin) { // rangeList heads mismatch
return std::make_pair(rangesA.front(), rangesB.front());
} else if (rangesA.back().end != rangesB.back().end) { // rangeList backs mismatch
return std::make_pair(rangesA.back(), rangesB.back());
}
int ia = 0;
int ib = 0;
KeyRangeRef rangeA = rangesA[0];
KeyRangeRef rangeB = rangesB[0];
KeyRange lastRangeA = Standalone(rangeA);
KeyRange lastRangeB = Standalone(rangeB);
while (true) {
if (rangeA.begin == rangeB.begin) {
if (rangeA.end == rangeB.end) {
if (rangeA.end == rangesA.back().end) {
break;
}
++ia;
++ib;
rangeA = rangesA[ia];
rangeB = rangesB[ib];
lastRangeA = Standalone(rangeA);
lastRangeB = Standalone(rangeB);
} else if (rangeA.end > rangeB.end) {
rangeA = KeyRangeRef(rangeB.end, rangeA.end);
++ib;
rangeB = rangesB[ib];
lastRangeB = Standalone(rangeB);
} else {
rangeB = KeyRangeRef(rangeA.end, rangeB.end);
++ia;
rangeA = rangesA[ia];
lastRangeA = Standalone(rangeA);
}
} else {
return std::make_pair(lastRangeA, lastRangeB);
}
}
return Optional<std::pair<KeyRange, KeyRange>>();
}
struct AuditGetShardInfoRes {
Version readAtVersion;
UID serverId;
@ -5153,175 +5038,6 @@ AuditGetShardInfoRes getThisServerShardInfo(StorageServer* data, KeyRange range)
return AuditGetShardInfoRes(data->version.get(), data->thisServerID, ownRange);
}
struct AuditGetServerKeysRes {
KeyRange completeRange;
Version readAtVersion;
UID serverId;
std::vector<KeyRange> ownRanges;
int64_t readBytes;
AuditGetServerKeysRes() = default;
AuditGetServerKeysRes(KeyRange completeRange,
Version readAtVersion,
UID serverId,
std::vector<KeyRange> ownRanges,
int64_t readBytes)
: completeRange(completeRange), readAtVersion(readAtVersion), serverId(serverId), ownRanges(ownRanges),
readBytes(readBytes) {}
};
// Given an input server, get ranges within the input range via the input transaction
// from the perspective of ServerKeys system key space
// Input: (1) SS id; (2) transaction tr; (3) within range
// Return AuditGetServerKeysRes, including: (1) complete range by a single read range;
// (2) verison of the read; (3) ranges of the input SS
ACTOR Future<AuditGetServerKeysRes> getThisServerKeysFromServerKeys(UID serverID, Transaction* tr, KeyRange range) {
state RangeResult readResult;
state AuditGetServerKeysRes res;
try {
wait(store(readResult,
krmGetRanges(tr,
serverKeysPrefixFor(serverID),
range,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
Future<Version> grvF = tr->getReadVersion();
if (!grvF.isReady()) {
TraceEvent(SevWarnAlways, "SSAuditStorageReadServerKeysGRVError", serverID);
throw audit_storage_cancelled();
}
Version readAtVersion = grvF.get();
TraceEvent(SevVerbose, "SSAuditStorageGetThisServerKeysFromServerKeysReadDone", serverID)
.detail("Range", range)
.detail("Prefix", serverKeysPrefixFor(serverID))
.detail("ResultSize", readResult.size())
.detail("AduitServerID", serverID);
std::vector<KeyRange> ownRanges;
for (int i = 0; i < readResult.size() - 1; ++i) {
TraceEvent(SevVerbose, "SSAuditStorageGetThisServerKeysFromServerKeysAddToResult", serverID)
.detail("ValueIsServerKeysFalse", readResult[i].value == serverKeysFalse)
.detail("ServerHasKey", serverHasKey(readResult[i].value))
.detail("Range", KeyRangeRef(readResult[i].key, readResult[i + 1].key))
.detail("AduitServerID", serverID);
if (serverHasKey(readResult[i].value)) {
KeyRange shardRange;
ownRanges.push_back(Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key)));
}
}
const KeyRange completeRange = Standalone(KeyRangeRef(range.begin, readResult.back().key));
TraceEvent(SevVerbose, "SSAuditStorageGetThisServerKeysFromServerKeysEnd", serverID)
.detail("AduitServerID", serverID)
.detail("Range", range)
.detail("Prefix", serverKeysPrefixFor(serverID))
.detail("ReadAtVersion", readAtVersion)
.detail("CompleteRange", completeRange)
.detail("ResultSize", ownRanges.size());
res = AuditGetServerKeysRes(completeRange, readAtVersion, serverID, ownRanges, readResult.logicalSize());
} catch (Error& e) {
TraceEvent(SevDebug, "SSAuditStorageGetThisServerKeysError", serverID)
.errorUnsuppressed(e)
.detail("AduitServerID", serverID);
throw e;
}
return res;
}
struct AuditGetKeyServersRes {
KeyRange completeRange;
Version readAtVersion;
int64_t readBytes;
std::unordered_map<UID, std::vector<KeyRange>> rangeOwnershipMap;
AuditGetKeyServersRes() = default;
AuditGetKeyServersRes(KeyRange completeRange,
Version readAtVersion,
std::unordered_map<UID, std::vector<KeyRange>> rangeOwnershipMap,
int64_t readBytes)
: completeRange(completeRange), readAtVersion(readAtVersion), rangeOwnershipMap(rangeOwnershipMap),
readBytes(readBytes) {}
};
// Given an input server, get ranges within the input range via the input transaction
// from the perspective of KeyServers system key space
// Input: (1) Audit Server ID (for logging); (2) transaction tr; (3) within range
// Return AuditGetKeyServersRes, including : (1) complete range by a single read range; (2) verison of the read;
// (3) map between SSes and their ranges --- in KeyServers space, a range corresponds to multiple SSes
ACTOR Future<AuditGetKeyServersRes> getShardMapFromKeyServers(UID auditServerId, Transaction* tr, KeyRange range) {
state AuditGetKeyServersRes res;
state std::vector<Future<Void>> actors;
state RangeResult readResult;
state RangeResult UIDtoTagMap;
state int64_t totalShardsCount = 0;
state int64_t shardsInAnonymousPhysicalShardCount = 0;
try {
// read
actors.push_back(store(readResult,
krmGetRanges(tr,
keyServersPrefix,
range,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
actors.push_back(store(UIDtoTagMap, tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)));
wait(waitForAll(actors));
if (UIDtoTagMap.more || UIDtoTagMap.size() >= CLIENT_KNOBS->TOO_MANY) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"SSAuditStorageReadKeyServersReadTagError",
auditServerId);
throw audit_storage_cancelled();
}
Future<Version> grvF = tr->getReadVersion();
if (!grvF.isReady()) {
TraceEvent(SevWarnAlways, "SSAuditStorageReadKeyServersGRVError", auditServerId);
throw audit_storage_cancelled();
}
Version readAtVersion = grvF.get();
TraceEvent(SevVerbose, "SSAuditStorageGetThisServerKeysFromKeyServersReadDone", auditServerId)
.detail("Range", range)
.detail("ResultSize", readResult.size())
.detail("AduitServerID", auditServerId);
// produce result
std::unordered_map<UID, std::vector<KeyRange>> serverOwnRanges;
for (int i = 0; i < readResult.size() - 1; ++i) {
std::vector<UID> src;
std::vector<UID> dest;
UID srcID;
UID destID;
decodeKeyServersValue(UIDtoTagMap, readResult[i].value, src, dest, srcID, destID);
if (srcID == anonymousShardId) {
shardsInAnonymousPhysicalShardCount++;
}
totalShardsCount++;
std::vector<UID> servers(src.size() + dest.size());
std::merge(src.begin(), src.end(), dest.begin(), dest.end(), servers.begin());
for (auto& ssid : servers) {
serverOwnRanges[ssid].push_back(Standalone(KeyRangeRef(readResult[i].key, readResult[i + 1].key)));
}
}
const KeyRange completeRange = Standalone(KeyRangeRef(range.begin, readResult.back().key));
TraceEvent(SevInfo, "SSAuditStorageGetThisServerKeysFromKeyServersEnd", auditServerId)
.detail("Range", range)
.detail("CompleteRange", completeRange)
.detail("AtVersion", readAtVersion)
.detail("ShardsInAnonymousPhysicalShardCount", shardsInAnonymousPhysicalShardCount)
.detail("TotalShardsCount", totalShardsCount);
res = AuditGetKeyServersRes(completeRange, readAtVersion, serverOwnRanges, readResult.logicalSize());
} catch (Error& e) {
TraceEvent(SevDebug, "SSAuditStorageGetThisServerKeysFromKeyServersError", auditServerId)
.errorUnsuppressed(e)
.detail("AuditServerId", auditServerId);
throw e;
}
return res;
}
// Check consistency between StorageServer->shardInfo and ServerKeys system key space
ACTOR Future<Void> auditStorageServerShardQ(StorageServer* data, AuditStorageRequest req) {
ASSERT(req.getType() == AuditType::ValidateStorageServerShard);
@ -5665,6 +5381,13 @@ ACTOR Future<Void> auditStorageServerShardQ(StorageServer* data, AuditStorageReq
rangeToReadBegin = res.range.end;
} else { // complete
req.reply.send(res);
TraceEvent(SevInfo, "SSAuditStorageSsShardComplete", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("AuditServer", data->thisServerID)
.detail("CompleteRange", res.range)
.detail("NumValidatedLocalShards", cumulatedValidatedLocalShardsNum)
.detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum);
break;
}
}
@ -5697,14 +5420,6 @@ ACTOR Future<Void> auditStorageServerShardQ(StorageServer* data, AuditStorageReq
}
}
TraceEvent(SevInfo, "SSAuditStorageSsShardComplete", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("AuditServer", data->thisServerID)
.detail("CompleteRange", res.range)
.detail("NumValidatedLocalShards", cumulatedValidatedLocalShardsNum)
.detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum);
// Make sure the history collection is not open due to this audit
data->stopTrackShardAssignment();
TraceEvent(SevVerbose, "SSShardAssignmentHistoryRecordStopWhenExit", data->thisServerID).detail("AuditID", req.id);
@ -5712,283 +5427,6 @@ ACTOR Future<Void> auditStorageServerShardQ(StorageServer* data, AuditStorageReq
return Void();
}
// Check consistency between KeyServers and ServerKeys system key space
ACTOR Future<Void> auditShardLocationMetadataQ(StorageServer* data, AuditStorageRequest req) {
ASSERT(req.getType() == AuditType::ValidateLocationMetadata);
wait(data->serveAuditStorageParallelismLock.take(TaskPriority::DefaultYield));
state FlowLock::Releaser holder(data->serveAuditStorageParallelismLock);
TraceEvent(SevInfo, "SSAuditStorageShardLocMetadataBegin", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range);
state AuditStorageState res(req.id, req.getType()); // we will set range of audit later
state std::vector<Future<Void>> actors;
state std::vector<std::string> errors;
state AuditGetKeyServersRes keyServerRes;
state std::unordered_map<UID, AuditGetServerKeysRes> serverKeyResMap;
state Version readAtVersion;
state std::unordered_map<UID, std::vector<KeyRange>> mapFromKeyServersRaw;
// Note that since krmReadRange may not return the value of the entire range at a time
// Given req.range, a part of the range is returned, thus, only a part of the range is
// able to be compared --- claimRange
// Given claimRange, rangeToRead is decided for reading the remaining range
// At beginning, rangeToRead is req.range
state KeyRange claimRange;
state KeyRange completeRangeByKeyServer;
// To compare
state std::unordered_map<UID, std::vector<KeyRange>> mapFromServerKeys;
state std::unordered_map<UID, std::vector<KeyRange>> mapFromKeyServers;
state Transaction tr(data->cx);
state Key rangeToReadBegin = req.range.begin;
state KeyRangeRef rangeToRead;
state int64_t cumulatedValidatedServerKeysNum = 0;
state int64_t cumulatedValidatedKeyServersNum = 0;
state Reference<IRateControl> rateLimiter =
Reference<IRateControl>(new SpeedLimit(SERVER_KNOBS->AUDIT_STORAGE_RATE_PER_SERVER_MAX, 1));
state int64_t remoteReadBytes = 0;
try {
loop {
try {
// Read
actors.clear();
errors.clear();
mapFromServerKeys.clear();
mapFromKeyServers.clear();
serverKeyResMap.clear();
mapFromKeyServersRaw.clear();
remoteReadBytes = 0;
rangeToRead = KeyRangeRef(rangeToReadBegin, req.range.end);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
// Read KeyServers
wait(store(keyServerRes, getShardMapFromKeyServers(data->thisServerID, &tr, rangeToRead)));
completeRangeByKeyServer = keyServerRes.completeRange;
readAtVersion = keyServerRes.readAtVersion;
mapFromKeyServersRaw = keyServerRes.rangeOwnershipMap;
remoteReadBytes += keyServerRes.readBytes;
// Use ssid of mapFromKeyServersRaw to read ServerKeys
for (auto& [ssid, _] : mapFromKeyServersRaw) {
actors.push_back(
store(serverKeyResMap[ssid], getThisServerKeysFromServerKeys(ssid, &tr, rangeToRead)));
}
wait(waitForAll(actors));
// Decide claimRange and check readAtVersion
claimRange = completeRangeByKeyServer;
for (auto& [ssid, serverKeyRes] : serverKeyResMap) {
KeyRange serverKeyCompleteRange = serverKeyRes.completeRange;
TraceEvent(SevVerbose, "SSAuditStorageShardLocMetadataGetClaimRange")
.detail("ServerId", ssid)
.detail("ServerKeyCompleteRange", serverKeyCompleteRange)
.detail("CurrentClaimRange", claimRange);
KeyRange overlappingRange = serverKeyCompleteRange & claimRange;
if (serverKeyCompleteRange.begin != claimRange.begin || overlappingRange.empty() ||
readAtVersion != serverKeyRes.readAtVersion) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"SSAuditStorageShardLocMetadataReadCheckWrong",
data->thisServerID)
.detail("ServerKeyCompleteRangeBegin", serverKeyCompleteRange.begin)
.detail("ClaimRangeBegin", claimRange.begin)
.detail("OverlappingRange", overlappingRange)
.detail("ReadAtVersion", readAtVersion)
.detail("ServerKeyResReadAtVersion", serverKeyRes.readAtVersion);
throw audit_storage_cancelled();
}
claimRange = overlappingRange;
remoteReadBytes += serverKeyRes.readBytes;
}
// Use claimRange to get mapFromServerKeys and mapFromKeyServers to compare
int64_t numValidatedServerKeys = 0;
for (auto& [ssid, serverKeyRes] : serverKeyResMap) {
for (auto& range : serverKeyRes.ownRanges) {
KeyRange overlappingRange = range & claimRange;
if (overlappingRange.empty()) {
continue;
}
TraceEvent(SevVerbose, "SSAuditStorageShardLocMetadataAddToServerKeyMap")
.detail("RawRange", range)
.detail("ClaimRange", claimRange)
.detail("Range", overlappingRange)
.detail("SSID", ssid);
mapFromServerKeys[ssid].push_back(overlappingRange);
numValidatedServerKeys++;
}
}
cumulatedValidatedServerKeysNum = cumulatedValidatedServerKeysNum + numValidatedServerKeys;
int64_t numValidatedKeyServers = 0;
for (auto& [ssid, ranges] : mapFromKeyServersRaw) {
std::vector mergedRanges = coalesceRangeList(ranges);
for (auto& range : mergedRanges) {
KeyRange overlappingRange = range & claimRange;
if (overlappingRange.empty()) {
continue;
}
TraceEvent(SevVerbose, "SSAuditStorageShardLocMetadataAddToKeyServerMap")
.detail("RawRange", range)
.detail("ClaimRange", claimRange)
.detail("Range", overlappingRange)
.detail("SSID", ssid);
mapFromKeyServers[ssid].push_back(overlappingRange);
numValidatedKeyServers++;
}
}
cumulatedValidatedKeyServersNum = cumulatedValidatedKeyServersNum + numValidatedKeyServers;
// Compare: check if mapFromKeyServers === mapFromServerKeys
// 1. check mapFromKeyServers => mapFromServerKeys
for (auto& [ssid, keyServerRanges] : mapFromKeyServers) {
if (!mapFromServerKeys.contains(ssid)) {
std::string error =
format("KeyServers and serverKeys mismatch: Some key in range(%s, %s) exists "
"on Server(%s) in KeyServers but not ServerKeys",
claimRange.toString().c_str(),
claimRange.toString().c_str(),
ssid.toString().c_str());
errors.push_back(error);
TraceEvent(SevError, "SSAuditStorageShardLocationMetadataError", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("ClaimRange", claimRange)
.detail("ErrorMessage", error)
.detail("AuditServerID", data->thisServerID);
}
std::vector<KeyRange> serverKeyRanges = mapFromServerKeys[ssid];
Optional<std::pair<KeyRange, KeyRange>> anyMismatch = rangesSame(keyServerRanges, serverKeyRanges);
if (anyMismatch.present()) { // mismatch detected
KeyRange mismatchedRangeByKeyServer = anyMismatch.get().first;
KeyRange mismatchedRangeByServerKey = anyMismatch.get().second;
std::string error =
format("KeyServers and serverKeys mismatch on Server(%s): KeyServer: %s; ServerKey: %s",
ssid.toString().c_str(),
mismatchedRangeByKeyServer.toString().c_str(),
mismatchedRangeByServerKey.toString().c_str());
errors.push_back(error);
TraceEvent(SevError, "SSAuditStorageShardLocationMetadataError", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("ClaimRange", claimRange)
.detail("ErrorMessage", error)
.detail("MismatchedRangeByKeyServer", mismatchedRangeByKeyServer)
.detail("MismatchedRangeByServerKey", mismatchedRangeByServerKey)
.detail("AuditServerID", data->thisServerID);
}
}
// 2. check mapFromServerKeys => mapFromKeyServers
for (auto& [ssid, serverKeyRanges] : mapFromServerKeys) {
if (!mapFromKeyServers.contains(ssid)) {
std::string error =
format("KeyServers and serverKeys mismatch: Some key of range(%s, %s) exists "
"on Server(%s) in ServerKeys but not KeyServers",
claimRange.toString().c_str(),
claimRange.toString().c_str(),
ssid.toString().c_str());
errors.push_back(error);
TraceEvent(SevError, "SSAuditStorageShardLocationMetadataError", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("ClaimRange", claimRange)
.detail("ErrorMessage", error)
.detail("AuditServerID", data->thisServerID);
}
}
// Log statistic
TraceEvent(SevInfo, "SSAuditStorageStatisticLocationMetadata", data->thisServerID)
.suppressFor(30.0)
.detail("AuditType", req.getType())
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("CurrentValidatedServerKeysNum", numValidatedServerKeys)
.detail("CurrentValidatedKeyServersNum", numValidatedServerKeys)
.detail("CurrentValidatedInclusiveRange", claimRange)
.detail("CumulatedValidatedServerKeysNum", cumulatedValidatedServerKeysNum)
.detail("CumulatedValidatedKeyServersNum", cumulatedValidatedKeyServersNum)
.detail("CumulatedValidatedInclusiveRange", KeyRangeRef(req.range.begin, claimRange.end));
// Return result
if (!errors.empty()) {
TraceEvent(SevError, "SSAuditStorageShardLocMetadataError", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("NumErrors", errors.size())
.detail("Version", readAtVersion)
.detail("AuditServerId", data->thisServerID)
.detail("ClaimRange", claimRange);
res.range = claimRange;
res.setPhase(AuditPhase::Error);
if (!req.ddId.isValid()) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"SSAuditStorageShardLocMetadataDDIdInvalid",
data->thisServerID);
throw audit_storage_cancelled();
}
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
wait(persistAuditStateByRange(data->cx, res));
req.reply.sendError(audit_storage_error());
break;
} else {
// Expand persisted complete range
res.range = Standalone(KeyRangeRef(req.range.begin, claimRange.end));
res.setPhase(AuditPhase::Complete);
if (!req.ddId.isValid()) {
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways,
"SSAuditStorageShardLocMetadataDDIdInvalid",
data->thisServerID);
throw audit_storage_cancelled();
}
res.ddId = req.ddId; // used to compare req.ddId with existing persisted ddId
wait(persistAuditStateByRange(data->cx, res));
if (res.range.end < req.range.end) {
TraceEvent(SevInfo, "SSAuditStorageShardLocMetadataPartialDone", data->thisServerID)
.suppressFor(10.0)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("Version", readAtVersion)
.detail("AuditServerId", data->thisServerID)
.detail("CompleteRange", res.range);
rangeToReadBegin = res.range.end;
} else { // complete
req.reply.send(res);
break;
}
}
} catch (Error& e) {
wait(tr.onError(e));
}
wait(rateLimiter->getAllowance(remoteReadBytes)); // Rate Keeping
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
return Void(); // sliently exit
}
TraceEvent(SevInfo, "SSAuditStorageShardLocMetadataFailed", data->thisServerID)
.errorUnsuppressed(e)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("AuditServer", data->thisServerID);
if (e.code() == error_code_audit_storage_cancelled) {
req.reply.sendError(audit_storage_cancelled());
} else {
req.reply.sendError(audit_storage_failed());
}
}
TraceEvent(SevInfo, "SSAuditStorageShardLocMetadataComplete", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("AuditServer", data->thisServerID)
.detail("CompleteRange", res.range)
.detail("NumValidatedServerKeys", cumulatedValidatedServerKeysNum)
.detail("NumValidatedKeyServers", cumulatedValidatedKeyServersNum);
return Void();
}
ACTOR Future<Void> auditStorageShardReplicaQ(StorageServer* data, AuditStorageRequest req) {
ASSERT(req.getType() == AuditType::ValidateHA || req.getType() == AuditType::ValidateReplica);
wait(data->serveAuditStorageParallelismLock.take(TaskPriority::DefaultYield));
@ -6305,6 +5743,14 @@ ACTOR Future<Void> auditStorageShardReplicaQ(StorageServer* data, AuditStorageRe
// Expand persisted complete range
if (complete) {
req.reply.send(res);
TraceEvent(SevInfo, "SSAuditStorageShardReplicaComplete", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("AuditServer", data->thisServerID)
.detail("CompleteRange", res.range)
.detail("CheckTimes", checkTimes)
.detail("NumValidatedKeys", numValidatedKeys)
.detail("ValidatedBytes", validatedBytes);
break;
} else {
TraceEvent(SevInfo, "SSAuditStorageShardReplicaPartialDone", data->thisServerID)
@ -6339,14 +5785,6 @@ ACTOR Future<Void> auditStorageShardReplicaQ(StorageServer* data, AuditStorageRe
req.reply.sendError(audit_storage_failed());
}
}
TraceEvent(SevInfo, "SSAuditStorageShardReplicaComplete", data->thisServerID)
.detail("AuditId", req.id)
.detail("AuditRange", req.range)
.detail("AuditServer", data->thisServerID)
.detail("CompleteRange", res.range)
.detail("CheckTimes", checkTimes)
.detail("NumValidatedKeys", numValidatedKeys)
.detail("ValidatedBytes", validatedBytes);
return Void();
}
@ -14017,7 +13455,8 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
}
when(AuditStorageRequest req = waitNext(ssi.auditStorage.getFuture())) {
// Check req
if (!req.id.isValid() || !req.ddId.isValid() || req.range.empty()) {
if (!req.id.isValid() || !req.ddId.isValid() || req.range.empty() ||
req.getType() == AuditType::ValidateLocationMetadata) {
// ddId is used when persist progress
TraceEvent(g_network->isSimulated() ? SevError : SevWarnAlways, "AuditRequestInvalid") // unexpected
.detail("AuditRange", req.range)
@ -14033,8 +13472,6 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
self->actors.add(auditStorageShardReplicaQ(self, req));
} else if (req.getType() == AuditType::ValidateReplica) {
self->actors.add(auditStorageShardReplicaQ(self, req));
} else if (req.getType() == AuditType::ValidateLocationMetadata) {
self->actors.add(auditShardLocationMetadataQ(self, req));
} else if (req.getType() == AuditType::ValidateStorageServerShard) {
self->actors.add(auditStorageServerShardQ(self, req));
} else {